1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 void 64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 65 { 66 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 67 free(cache_buffer); 68 } 69 70 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 71 72 struct spdk_file { 73 struct spdk_filesystem *fs; 74 struct spdk_blob *blob; 75 char *name; 76 uint64_t length; 77 bool is_deleted; 78 bool open_for_writing; 79 uint64_t length_flushed; 80 uint64_t append_pos; 81 uint64_t seq_byte_count; 82 uint64_t next_seq_offset; 83 uint32_t priority; 84 TAILQ_ENTRY(spdk_file) tailq; 85 spdk_blob_id blobid; 86 uint32_t ref_count; 87 pthread_spinlock_t lock; 88 struct cache_buffer *last; 89 struct cache_tree *tree; 90 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 91 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 92 TAILQ_ENTRY(spdk_file) cache_tailq; 93 }; 94 95 struct spdk_deleted_file { 96 spdk_blob_id id; 97 TAILQ_ENTRY(spdk_deleted_file) tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 struct iovec *iovs; 138 uint32_t iovcnt; 139 struct iovec iov; 140 union { 141 struct { 142 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 143 } fs_load; 144 struct { 145 uint64_t length; 146 } truncate; 147 struct { 148 struct spdk_io_channel *channel; 149 void *pin_buf; 150 int is_read; 151 off_t offset; 152 size_t length; 153 uint64_t start_lba; 154 uint64_t num_lba; 155 uint32_t blocklen; 156 } rw; 157 struct { 158 const char *old_name; 159 const char *new_name; 160 } rename; 161 struct { 162 struct cache_buffer *cache_buffer; 163 uint64_t length; 164 } flush; 165 struct { 166 struct cache_buffer *cache_buffer; 167 uint64_t length; 168 uint64_t offset; 169 } readahead; 170 struct { 171 uint64_t offset; 172 TAILQ_ENTRY(spdk_fs_request) tailq; 173 bool xattr_in_progress; 174 } sync; 175 struct { 176 uint32_t num_clusters; 177 } resize; 178 struct { 179 const char *name; 180 uint32_t flags; 181 TAILQ_ENTRY(spdk_fs_request) tailq; 182 } open; 183 struct { 184 const char *name; 185 struct spdk_blob *blob; 186 } create; 187 struct { 188 const char *name; 189 } delete; 190 struct { 191 const char *name; 192 } stat; 193 } op; 194 }; 195 196 static void cache_free_buffers(struct spdk_file *file); 197 198 void 199 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 200 { 201 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 202 } 203 204 static void 205 __initialize_cache(void) 206 { 207 assert(g_cache_pool == NULL); 208 209 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 210 g_fs_cache_size / CACHE_BUFFER_SIZE, 211 CACHE_BUFFER_SIZE, 212 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 213 SPDK_ENV_SOCKET_ID_ANY); 214 if (!g_cache_pool) { 215 SPDK_ERRLOG("Create mempool failed, you may " 216 "increase the memory and try again\n"); 217 assert(false); 218 } 219 TAILQ_INIT(&g_caches); 220 pthread_spin_init(&g_caches_lock, 0); 221 } 222 223 static void 224 __free_cache(void) 225 { 226 assert(g_cache_pool != NULL); 227 228 spdk_mempool_free(g_cache_pool); 229 g_cache_pool = NULL; 230 } 231 232 static uint64_t 233 __file_get_blob_size(struct spdk_file *file) 234 { 235 uint64_t cluster_sz; 236 237 cluster_sz = file->fs->bs_opts.cluster_sz; 238 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 239 } 240 241 struct spdk_fs_request { 242 struct spdk_fs_cb_args args; 243 TAILQ_ENTRY(spdk_fs_request) link; 244 struct spdk_fs_channel *channel; 245 }; 246 247 struct spdk_fs_channel { 248 struct spdk_fs_request *req_mem; 249 TAILQ_HEAD(, spdk_fs_request) reqs; 250 sem_t sem; 251 struct spdk_filesystem *fs; 252 struct spdk_io_channel *bs_channel; 253 fs_send_request_fn send_request; 254 bool sync; 255 pthread_spinlock_t lock; 256 }; 257 258 /* For now, this is effectively an alias. But eventually we'll shift 259 * some data members over. */ 260 struct spdk_fs_thread_ctx { 261 struct spdk_fs_channel ch; 262 }; 263 264 static struct spdk_fs_request * 265 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 266 { 267 struct spdk_fs_request *req; 268 struct iovec *iovs = NULL; 269 270 if (iovcnt > 1) { 271 iovs = calloc(iovcnt, sizeof(struct iovec)); 272 if (!iovs) { 273 return NULL; 274 } 275 } 276 277 if (channel->sync) { 278 pthread_spin_lock(&channel->lock); 279 } 280 281 req = TAILQ_FIRST(&channel->reqs); 282 if (req) { 283 TAILQ_REMOVE(&channel->reqs, req, link); 284 } 285 286 if (channel->sync) { 287 pthread_spin_unlock(&channel->lock); 288 } 289 290 if (req == NULL) { 291 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 292 free(iovs); 293 return NULL; 294 } 295 memset(req, 0, sizeof(*req)); 296 req->channel = channel; 297 if (iovcnt > 1) { 298 req->args.iovs = iovs; 299 } else { 300 req->args.iovs = &req->args.iov; 301 } 302 req->args.iovcnt = iovcnt; 303 304 return req; 305 } 306 307 static struct spdk_fs_request * 308 alloc_fs_request(struct spdk_fs_channel *channel) 309 { 310 return alloc_fs_request_with_iov(channel, 0); 311 } 312 313 static void 314 free_fs_request(struct spdk_fs_request *req) 315 { 316 struct spdk_fs_channel *channel = req->channel; 317 318 if (req->args.iovcnt > 1) { 319 free(req->args.iovs); 320 } 321 322 if (channel->sync) { 323 pthread_spin_lock(&channel->lock); 324 } 325 326 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 327 328 if (channel->sync) { 329 pthread_spin_unlock(&channel->lock); 330 } 331 } 332 333 static int 334 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 335 uint32_t max_ops) 336 { 337 uint32_t i; 338 339 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 340 if (!channel->req_mem) { 341 return -1; 342 } 343 344 TAILQ_INIT(&channel->reqs); 345 sem_init(&channel->sem, 0, 0); 346 347 for (i = 0; i < max_ops; i++) { 348 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 349 } 350 351 channel->fs = fs; 352 353 return 0; 354 } 355 356 static int 357 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 358 { 359 struct spdk_filesystem *fs; 360 struct spdk_fs_channel *channel = ctx_buf; 361 362 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 363 364 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 365 } 366 367 static int 368 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 369 { 370 struct spdk_filesystem *fs; 371 struct spdk_fs_channel *channel = ctx_buf; 372 373 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 374 375 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 376 } 377 378 static int 379 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 380 { 381 struct spdk_filesystem *fs; 382 struct spdk_fs_channel *channel = ctx_buf; 383 384 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 385 386 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 387 } 388 389 static void 390 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 391 { 392 struct spdk_fs_channel *channel = ctx_buf; 393 394 free(channel->req_mem); 395 if (channel->bs_channel != NULL) { 396 spdk_bs_free_io_channel(channel->bs_channel); 397 } 398 } 399 400 static void 401 __send_request_direct(fs_request_fn fn, void *arg) 402 { 403 fn(arg); 404 } 405 406 static void 407 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 408 { 409 fs->bs = bs; 410 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 411 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 412 fs->md_target.md_fs_channel->send_request = __send_request_direct; 413 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 414 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 415 416 pthread_mutex_lock(&g_cache_init_lock); 417 if (g_fs_count == 0) { 418 __initialize_cache(); 419 } 420 g_fs_count++; 421 pthread_mutex_unlock(&g_cache_init_lock); 422 } 423 424 static void 425 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 426 { 427 struct spdk_fs_request *req = ctx; 428 struct spdk_fs_cb_args *args = &req->args; 429 struct spdk_filesystem *fs = args->fs; 430 431 if (bserrno == 0) { 432 common_fs_bs_init(fs, bs); 433 } else { 434 free(fs); 435 fs = NULL; 436 } 437 438 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 439 free_fs_request(req); 440 } 441 442 static void 443 fs_conf_parse(void) 444 { 445 struct spdk_conf_section *sp; 446 447 sp = spdk_conf_find_section(NULL, "Blobfs"); 448 if (sp == NULL) { 449 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 450 return; 451 } 452 453 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 454 if (g_fs_cache_buffer_shift <= 0) { 455 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 456 } 457 } 458 459 static struct spdk_filesystem * 460 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 461 { 462 struct spdk_filesystem *fs; 463 464 fs = calloc(1, sizeof(*fs)); 465 if (fs == NULL) { 466 return NULL; 467 } 468 469 fs->bdev = dev; 470 fs->send_request = send_request_fn; 471 TAILQ_INIT(&fs->files); 472 473 fs->md_target.max_ops = 512; 474 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 475 sizeof(struct spdk_fs_channel), "blobfs_md"); 476 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 477 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 478 479 fs->sync_target.max_ops = 512; 480 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 481 sizeof(struct spdk_fs_channel), "blobfs_sync"); 482 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 483 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 484 485 fs->io_target.max_ops = 512; 486 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 487 sizeof(struct spdk_fs_channel), "blobfs_io"); 488 489 return fs; 490 } 491 492 static void 493 __wake_caller(void *arg, int fserrno) 494 { 495 struct spdk_fs_cb_args *args = arg; 496 497 args->rc = fserrno; 498 sem_post(args->sem); 499 } 500 501 void 502 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 503 fs_send_request_fn send_request_fn, 504 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 505 { 506 struct spdk_filesystem *fs; 507 struct spdk_fs_request *req; 508 struct spdk_fs_cb_args *args; 509 struct spdk_bs_opts opts = {}; 510 511 fs = fs_alloc(dev, send_request_fn); 512 if (fs == NULL) { 513 cb_fn(cb_arg, NULL, -ENOMEM); 514 return; 515 } 516 517 fs_conf_parse(); 518 519 req = alloc_fs_request(fs->md_target.md_fs_channel); 520 if (req == NULL) { 521 spdk_put_io_channel(fs->md_target.md_io_channel); 522 spdk_io_device_unregister(&fs->md_target, NULL); 523 spdk_put_io_channel(fs->sync_target.sync_io_channel); 524 spdk_io_device_unregister(&fs->sync_target, NULL); 525 spdk_io_device_unregister(&fs->io_target, NULL); 526 free(fs); 527 cb_fn(cb_arg, NULL, -ENOMEM); 528 return; 529 } 530 531 args = &req->args; 532 args->fn.fs_op_with_handle = cb_fn; 533 args->arg = cb_arg; 534 args->fs = fs; 535 536 spdk_bs_opts_init(&opts); 537 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 538 if (opt) { 539 opts.cluster_sz = opt->cluster_sz; 540 } 541 spdk_bs_init(dev, &opts, init_cb, req); 542 } 543 544 static struct spdk_file * 545 file_alloc(struct spdk_filesystem *fs) 546 { 547 struct spdk_file *file; 548 549 file = calloc(1, sizeof(*file)); 550 if (file == NULL) { 551 return NULL; 552 } 553 554 file->tree = calloc(1, sizeof(*file->tree)); 555 if (file->tree == NULL) { 556 free(file); 557 return NULL; 558 } 559 560 file->fs = fs; 561 TAILQ_INIT(&file->open_requests); 562 TAILQ_INIT(&file->sync_requests); 563 pthread_spin_init(&file->lock, 0); 564 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 565 file->priority = SPDK_FILE_PRIORITY_LOW; 566 return file; 567 } 568 569 static void fs_load_done(void *ctx, int bserrno); 570 571 static int 572 _handle_deleted_files(struct spdk_fs_request *req) 573 { 574 struct spdk_fs_cb_args *args = &req->args; 575 struct spdk_filesystem *fs = args->fs; 576 577 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 578 struct spdk_deleted_file *deleted_file; 579 580 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 581 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 582 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 583 free(deleted_file); 584 return 0; 585 } 586 587 return 1; 588 } 589 590 static void 591 fs_load_done(void *ctx, int bserrno) 592 { 593 struct spdk_fs_request *req = ctx; 594 struct spdk_fs_cb_args *args = &req->args; 595 struct spdk_filesystem *fs = args->fs; 596 597 /* The filesystem has been loaded. Now check if there are any files that 598 * were marked for deletion before last unload. Do not complete the 599 * fs_load callback until all of them have been deleted on disk. 600 */ 601 if (_handle_deleted_files(req) == 0) { 602 /* We found a file that's been marked for deleting but not actually 603 * deleted yet. This function will get called again once the delete 604 * operation is completed. 605 */ 606 return; 607 } 608 609 args->fn.fs_op_with_handle(args->arg, fs, 0); 610 free_fs_request(req); 611 612 } 613 614 static void 615 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 616 { 617 struct spdk_fs_request *req = ctx; 618 struct spdk_fs_cb_args *args = &req->args; 619 struct spdk_filesystem *fs = args->fs; 620 uint64_t *length; 621 const char *name; 622 uint32_t *is_deleted; 623 size_t value_len; 624 625 if (rc < 0) { 626 args->fn.fs_op_with_handle(args->arg, fs, rc); 627 free_fs_request(req); 628 return; 629 } 630 631 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 632 if (rc < 0) { 633 args->fn.fs_op_with_handle(args->arg, fs, rc); 634 free_fs_request(req); 635 return; 636 } 637 638 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 639 if (rc < 0) { 640 args->fn.fs_op_with_handle(args->arg, fs, rc); 641 free_fs_request(req); 642 return; 643 } 644 645 assert(value_len == 8); 646 647 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 648 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 649 if (rc < 0) { 650 struct spdk_file *f; 651 652 f = file_alloc(fs); 653 if (f == NULL) { 654 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 655 free_fs_request(req); 656 return; 657 } 658 659 f->name = strdup(name); 660 f->blobid = spdk_blob_get_id(blob); 661 f->length = *length; 662 f->length_flushed = *length; 663 f->append_pos = *length; 664 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 665 } else { 666 struct spdk_deleted_file *deleted_file; 667 668 deleted_file = calloc(1, sizeof(*deleted_file)); 669 if (deleted_file == NULL) { 670 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 671 free_fs_request(req); 672 return; 673 } 674 deleted_file->id = spdk_blob_get_id(blob); 675 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 676 } 677 } 678 679 static void 680 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 681 { 682 struct spdk_fs_request *req = ctx; 683 struct spdk_fs_cb_args *args = &req->args; 684 struct spdk_filesystem *fs = args->fs; 685 struct spdk_bs_type bstype; 686 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 687 static const struct spdk_bs_type zeros; 688 689 if (bserrno != 0) { 690 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 691 free_fs_request(req); 692 free(fs); 693 return; 694 } 695 696 bstype = spdk_bs_get_bstype(bs); 697 698 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 699 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 700 spdk_bs_set_bstype(bs, blobfs_type); 701 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 702 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 703 SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 704 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 705 free_fs_request(req); 706 free(fs); 707 return; 708 } 709 710 common_fs_bs_init(fs, bs); 711 fs_load_done(req, 0); 712 } 713 714 static void 715 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 716 { 717 assert(fs != NULL); 718 spdk_io_device_unregister(&fs->md_target, NULL); 719 spdk_io_device_unregister(&fs->sync_target, NULL); 720 spdk_io_device_unregister(&fs->io_target, NULL); 721 free(fs); 722 } 723 724 static void 725 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 726 { 727 assert(fs != NULL); 728 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 729 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 730 } 731 732 void 733 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 734 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 735 { 736 struct spdk_filesystem *fs; 737 struct spdk_fs_cb_args *args; 738 struct spdk_fs_request *req; 739 struct spdk_bs_opts bs_opts; 740 741 fs = fs_alloc(dev, send_request_fn); 742 if (fs == NULL) { 743 cb_fn(cb_arg, NULL, -ENOMEM); 744 return; 745 } 746 747 fs_conf_parse(); 748 749 req = alloc_fs_request(fs->md_target.md_fs_channel); 750 if (req == NULL) { 751 spdk_fs_free_io_channels(fs); 752 spdk_fs_io_device_unregister(fs); 753 cb_fn(cb_arg, NULL, -ENOMEM); 754 return; 755 } 756 757 args = &req->args; 758 args->fn.fs_op_with_handle = cb_fn; 759 args->arg = cb_arg; 760 args->fs = fs; 761 TAILQ_INIT(&args->op.fs_load.deleted_files); 762 spdk_bs_opts_init(&bs_opts); 763 bs_opts.iter_cb_fn = iter_cb; 764 bs_opts.iter_cb_arg = req; 765 spdk_bs_load(dev, &bs_opts, load_cb, req); 766 } 767 768 static void 769 unload_cb(void *ctx, int bserrno) 770 { 771 struct spdk_fs_request *req = ctx; 772 struct spdk_fs_cb_args *args = &req->args; 773 struct spdk_filesystem *fs = args->fs; 774 struct spdk_file *file, *tmp; 775 776 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 777 TAILQ_REMOVE(&fs->files, file, tailq); 778 cache_free_buffers(file); 779 free(file->name); 780 free(file->tree); 781 free(file); 782 } 783 784 pthread_mutex_lock(&g_cache_init_lock); 785 g_fs_count--; 786 if (g_fs_count == 0) { 787 __free_cache(); 788 } 789 pthread_mutex_unlock(&g_cache_init_lock); 790 791 args->fn.fs_op(args->arg, bserrno); 792 free(req); 793 794 spdk_fs_io_device_unregister(fs); 795 } 796 797 void 798 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 799 { 800 struct spdk_fs_request *req; 801 struct spdk_fs_cb_args *args; 802 803 /* 804 * We must free the md_channel before unloading the blobstore, so just 805 * allocate this request from the general heap. 806 */ 807 req = calloc(1, sizeof(*req)); 808 if (req == NULL) { 809 cb_fn(cb_arg, -ENOMEM); 810 return; 811 } 812 813 args = &req->args; 814 args->fn.fs_op = cb_fn; 815 args->arg = cb_arg; 816 args->fs = fs; 817 818 spdk_fs_free_io_channels(fs); 819 spdk_bs_unload(fs->bs, unload_cb, req); 820 } 821 822 static struct spdk_file * 823 fs_find_file(struct spdk_filesystem *fs, const char *name) 824 { 825 struct spdk_file *file; 826 827 TAILQ_FOREACH(file, &fs->files, tailq) { 828 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 829 return file; 830 } 831 } 832 833 return NULL; 834 } 835 836 void 837 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 838 spdk_file_stat_op_complete cb_fn, void *cb_arg) 839 { 840 struct spdk_file_stat stat; 841 struct spdk_file *f = NULL; 842 843 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 844 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 845 return; 846 } 847 848 f = fs_find_file(fs, name); 849 if (f != NULL) { 850 stat.blobid = f->blobid; 851 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 852 cb_fn(cb_arg, &stat, 0); 853 return; 854 } 855 856 cb_fn(cb_arg, NULL, -ENOENT); 857 } 858 859 static void 860 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 861 { 862 struct spdk_fs_request *req = arg; 863 struct spdk_fs_cb_args *args = &req->args; 864 865 args->rc = fserrno; 866 if (fserrno == 0) { 867 memcpy(args->arg, stat, sizeof(*stat)); 868 } 869 sem_post(args->sem); 870 } 871 872 static void 873 __file_stat(void *arg) 874 { 875 struct spdk_fs_request *req = arg; 876 struct spdk_fs_cb_args *args = &req->args; 877 878 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 879 args->fn.stat_op, req); 880 } 881 882 int 883 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 884 const char *name, struct spdk_file_stat *stat) 885 { 886 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 887 struct spdk_fs_request *req; 888 int rc; 889 890 req = alloc_fs_request(channel); 891 if (req == NULL) { 892 return -ENOMEM; 893 } 894 895 req->args.fs = fs; 896 req->args.op.stat.name = name; 897 req->args.fn.stat_op = __copy_stat; 898 req->args.arg = stat; 899 req->args.sem = &channel->sem; 900 channel->send_request(__file_stat, req); 901 sem_wait(&channel->sem); 902 903 rc = req->args.rc; 904 free_fs_request(req); 905 906 return rc; 907 } 908 909 static void 910 fs_create_blob_close_cb(void *ctx, int bserrno) 911 { 912 int rc; 913 struct spdk_fs_request *req = ctx; 914 struct spdk_fs_cb_args *args = &req->args; 915 916 rc = args->rc ? args->rc : bserrno; 917 args->fn.file_op(args->arg, rc); 918 free_fs_request(req); 919 } 920 921 static void 922 fs_create_blob_resize_cb(void *ctx, int bserrno) 923 { 924 struct spdk_fs_request *req = ctx; 925 struct spdk_fs_cb_args *args = &req->args; 926 struct spdk_file *f = args->file; 927 struct spdk_blob *blob = args->op.create.blob; 928 uint64_t length = 0; 929 930 args->rc = bserrno; 931 if (bserrno) { 932 spdk_blob_close(blob, fs_create_blob_close_cb, args); 933 return; 934 } 935 936 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 937 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 938 939 spdk_blob_close(blob, fs_create_blob_close_cb, args); 940 } 941 942 static void 943 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 944 { 945 struct spdk_fs_request *req = ctx; 946 struct spdk_fs_cb_args *args = &req->args; 947 948 if (bserrno) { 949 args->fn.file_op(args->arg, bserrno); 950 free_fs_request(req); 951 return; 952 } 953 954 args->op.create.blob = blob; 955 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 956 } 957 958 static void 959 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 960 { 961 struct spdk_fs_request *req = ctx; 962 struct spdk_fs_cb_args *args = &req->args; 963 struct spdk_file *f = args->file; 964 965 if (bserrno) { 966 args->fn.file_op(args->arg, bserrno); 967 free_fs_request(req); 968 return; 969 } 970 971 f->blobid = blobid; 972 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 973 } 974 975 void 976 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 977 spdk_file_op_complete cb_fn, void *cb_arg) 978 { 979 struct spdk_file *file; 980 struct spdk_fs_request *req; 981 struct spdk_fs_cb_args *args; 982 983 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 984 cb_fn(cb_arg, -ENAMETOOLONG); 985 return; 986 } 987 988 file = fs_find_file(fs, name); 989 if (file != NULL) { 990 cb_fn(cb_arg, -EEXIST); 991 return; 992 } 993 994 file = file_alloc(fs); 995 if (file == NULL) { 996 cb_fn(cb_arg, -ENOMEM); 997 return; 998 } 999 1000 req = alloc_fs_request(fs->md_target.md_fs_channel); 1001 if (req == NULL) { 1002 cb_fn(cb_arg, -ENOMEM); 1003 return; 1004 } 1005 1006 args = &req->args; 1007 args->file = file; 1008 args->fn.file_op = cb_fn; 1009 args->arg = cb_arg; 1010 1011 file->name = strdup(name); 1012 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1013 } 1014 1015 static void 1016 __fs_create_file_done(void *arg, int fserrno) 1017 { 1018 struct spdk_fs_request *req = arg; 1019 struct spdk_fs_cb_args *args = &req->args; 1020 1021 args->rc = fserrno; 1022 sem_post(args->sem); 1023 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1024 } 1025 1026 static void 1027 __fs_create_file(void *arg) 1028 { 1029 struct spdk_fs_request *req = arg; 1030 struct spdk_fs_cb_args *args = &req->args; 1031 1032 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1033 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1034 } 1035 1036 int 1037 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1038 { 1039 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1040 struct spdk_fs_request *req; 1041 struct spdk_fs_cb_args *args; 1042 int rc; 1043 1044 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1045 1046 req = alloc_fs_request(channel); 1047 if (req == NULL) { 1048 return -ENOMEM; 1049 } 1050 1051 args = &req->args; 1052 args->fs = fs; 1053 args->op.create.name = name; 1054 args->sem = &channel->sem; 1055 fs->send_request(__fs_create_file, req); 1056 sem_wait(&channel->sem); 1057 rc = args->rc; 1058 free_fs_request(req); 1059 1060 return rc; 1061 } 1062 1063 static void 1064 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1065 { 1066 struct spdk_fs_request *req = ctx; 1067 struct spdk_fs_cb_args *args = &req->args; 1068 struct spdk_file *f = args->file; 1069 1070 f->blob = blob; 1071 while (!TAILQ_EMPTY(&f->open_requests)) { 1072 req = TAILQ_FIRST(&f->open_requests); 1073 args = &req->args; 1074 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1075 args->fn.file_op_with_handle(args->arg, f, bserrno); 1076 free_fs_request(req); 1077 } 1078 } 1079 1080 static void 1081 fs_open_blob_create_cb(void *ctx, int bserrno) 1082 { 1083 struct spdk_fs_request *req = ctx; 1084 struct spdk_fs_cb_args *args = &req->args; 1085 struct spdk_file *file = args->file; 1086 struct spdk_filesystem *fs = args->fs; 1087 1088 if (file == NULL) { 1089 /* 1090 * This is from an open with CREATE flag - the file 1091 * is now created so look it up in the file list for this 1092 * filesystem. 1093 */ 1094 file = fs_find_file(fs, args->op.open.name); 1095 assert(file != NULL); 1096 args->file = file; 1097 } 1098 1099 file->ref_count++; 1100 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1101 if (file->ref_count == 1) { 1102 assert(file->blob == NULL); 1103 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1104 } else if (file->blob != NULL) { 1105 fs_open_blob_done(req, file->blob, 0); 1106 } else { 1107 /* 1108 * The blob open for this file is in progress due to a previous 1109 * open request. When that open completes, it will invoke the 1110 * open callback for this request. 1111 */ 1112 } 1113 } 1114 1115 void 1116 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1117 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1118 { 1119 struct spdk_file *f = NULL; 1120 struct spdk_fs_request *req; 1121 struct spdk_fs_cb_args *args; 1122 1123 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1124 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1125 return; 1126 } 1127 1128 f = fs_find_file(fs, name); 1129 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1130 cb_fn(cb_arg, NULL, -ENOENT); 1131 return; 1132 } 1133 1134 if (f != NULL && f->is_deleted == true) { 1135 cb_fn(cb_arg, NULL, -ENOENT); 1136 return; 1137 } 1138 1139 req = alloc_fs_request(fs->md_target.md_fs_channel); 1140 if (req == NULL) { 1141 cb_fn(cb_arg, NULL, -ENOMEM); 1142 return; 1143 } 1144 1145 args = &req->args; 1146 args->fn.file_op_with_handle = cb_fn; 1147 args->arg = cb_arg; 1148 args->file = f; 1149 args->fs = fs; 1150 args->op.open.name = name; 1151 1152 if (f == NULL) { 1153 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1154 } else { 1155 fs_open_blob_create_cb(req, 0); 1156 } 1157 } 1158 1159 static void 1160 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1161 { 1162 struct spdk_fs_request *req = arg; 1163 struct spdk_fs_cb_args *args = &req->args; 1164 1165 args->file = file; 1166 __wake_caller(args, bserrno); 1167 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1168 } 1169 1170 static void 1171 __fs_open_file(void *arg) 1172 { 1173 struct spdk_fs_request *req = arg; 1174 struct spdk_fs_cb_args *args = &req->args; 1175 1176 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1177 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1178 __fs_open_file_done, req); 1179 } 1180 1181 int 1182 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1183 const char *name, uint32_t flags, struct spdk_file **file) 1184 { 1185 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1186 struct spdk_fs_request *req; 1187 struct spdk_fs_cb_args *args; 1188 int rc; 1189 1190 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1191 1192 req = alloc_fs_request(channel); 1193 if (req == NULL) { 1194 return -ENOMEM; 1195 } 1196 1197 args = &req->args; 1198 args->fs = fs; 1199 args->op.open.name = name; 1200 args->op.open.flags = flags; 1201 args->sem = &channel->sem; 1202 fs->send_request(__fs_open_file, req); 1203 sem_wait(&channel->sem); 1204 rc = args->rc; 1205 if (rc == 0) { 1206 *file = args->file; 1207 } else { 1208 *file = NULL; 1209 } 1210 free_fs_request(req); 1211 1212 return rc; 1213 } 1214 1215 static void 1216 fs_rename_blob_close_cb(void *ctx, int bserrno) 1217 { 1218 struct spdk_fs_request *req = ctx; 1219 struct spdk_fs_cb_args *args = &req->args; 1220 1221 args->fn.fs_op(args->arg, bserrno); 1222 free_fs_request(req); 1223 } 1224 1225 static void 1226 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1227 { 1228 struct spdk_fs_request *req = ctx; 1229 struct spdk_fs_cb_args *args = &req->args; 1230 const char *new_name = args->op.rename.new_name; 1231 1232 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1233 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1234 } 1235 1236 static void 1237 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1238 { 1239 struct spdk_fs_cb_args *args = &req->args; 1240 struct spdk_file *f; 1241 1242 f = fs_find_file(args->fs, args->op.rename.old_name); 1243 if (f == NULL) { 1244 args->fn.fs_op(args->arg, -ENOENT); 1245 free_fs_request(req); 1246 return; 1247 } 1248 1249 free(f->name); 1250 f->name = strdup(args->op.rename.new_name); 1251 args->file = f; 1252 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1253 } 1254 1255 static void 1256 fs_rename_delete_done(void *arg, int fserrno) 1257 { 1258 __spdk_fs_md_rename_file(arg); 1259 } 1260 1261 void 1262 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1263 const char *old_name, const char *new_name, 1264 spdk_file_op_complete cb_fn, void *cb_arg) 1265 { 1266 struct spdk_file *f; 1267 struct spdk_fs_request *req; 1268 struct spdk_fs_cb_args *args; 1269 1270 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1271 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1272 cb_fn(cb_arg, -ENAMETOOLONG); 1273 return; 1274 } 1275 1276 req = alloc_fs_request(fs->md_target.md_fs_channel); 1277 if (req == NULL) { 1278 cb_fn(cb_arg, -ENOMEM); 1279 return; 1280 } 1281 1282 args = &req->args; 1283 args->fn.fs_op = cb_fn; 1284 args->fs = fs; 1285 args->arg = cb_arg; 1286 args->op.rename.old_name = old_name; 1287 args->op.rename.new_name = new_name; 1288 1289 f = fs_find_file(fs, new_name); 1290 if (f == NULL) { 1291 __spdk_fs_md_rename_file(req); 1292 return; 1293 } 1294 1295 /* 1296 * The rename overwrites an existing file. So delete the existing file, then 1297 * do the actual rename. 1298 */ 1299 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1300 } 1301 1302 static void 1303 __fs_rename_file_done(void *arg, int fserrno) 1304 { 1305 struct spdk_fs_request *req = arg; 1306 struct spdk_fs_cb_args *args = &req->args; 1307 1308 __wake_caller(args, fserrno); 1309 } 1310 1311 static void 1312 __fs_rename_file(void *arg) 1313 { 1314 struct spdk_fs_request *req = arg; 1315 struct spdk_fs_cb_args *args = &req->args; 1316 1317 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1318 __fs_rename_file_done, req); 1319 } 1320 1321 int 1322 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1323 const char *old_name, const char *new_name) 1324 { 1325 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1326 struct spdk_fs_request *req; 1327 struct spdk_fs_cb_args *args; 1328 int rc; 1329 1330 req = alloc_fs_request(channel); 1331 if (req == NULL) { 1332 return -ENOMEM; 1333 } 1334 1335 args = &req->args; 1336 1337 args->fs = fs; 1338 args->op.rename.old_name = old_name; 1339 args->op.rename.new_name = new_name; 1340 args->sem = &channel->sem; 1341 fs->send_request(__fs_rename_file, req); 1342 sem_wait(&channel->sem); 1343 rc = args->rc; 1344 free_fs_request(req); 1345 return rc; 1346 } 1347 1348 static void 1349 blob_delete_cb(void *ctx, int bserrno) 1350 { 1351 struct spdk_fs_request *req = ctx; 1352 struct spdk_fs_cb_args *args = &req->args; 1353 1354 args->fn.file_op(args->arg, bserrno); 1355 free_fs_request(req); 1356 } 1357 1358 void 1359 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1360 spdk_file_op_complete cb_fn, void *cb_arg) 1361 { 1362 struct spdk_file *f; 1363 spdk_blob_id blobid; 1364 struct spdk_fs_request *req; 1365 struct spdk_fs_cb_args *args; 1366 1367 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1368 1369 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1370 cb_fn(cb_arg, -ENAMETOOLONG); 1371 return; 1372 } 1373 1374 f = fs_find_file(fs, name); 1375 if (f == NULL) { 1376 cb_fn(cb_arg, -ENOENT); 1377 return; 1378 } 1379 1380 req = alloc_fs_request(fs->md_target.md_fs_channel); 1381 if (req == NULL) { 1382 cb_fn(cb_arg, -ENOMEM); 1383 return; 1384 } 1385 1386 args = &req->args; 1387 args->fn.file_op = cb_fn; 1388 args->arg = cb_arg; 1389 1390 if (f->ref_count > 0) { 1391 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1392 f->is_deleted = true; 1393 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1394 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1395 return; 1396 } 1397 1398 TAILQ_REMOVE(&fs->files, f, tailq); 1399 1400 cache_free_buffers(f); 1401 1402 blobid = f->blobid; 1403 1404 free(f->name); 1405 free(f->tree); 1406 free(f); 1407 1408 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1409 } 1410 1411 static void 1412 __fs_delete_file_done(void *arg, int fserrno) 1413 { 1414 struct spdk_fs_request *req = arg; 1415 struct spdk_fs_cb_args *args = &req->args; 1416 1417 __wake_caller(args, fserrno); 1418 } 1419 1420 static void 1421 __fs_delete_file(void *arg) 1422 { 1423 struct spdk_fs_request *req = arg; 1424 struct spdk_fs_cb_args *args = &req->args; 1425 1426 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1427 } 1428 1429 int 1430 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1431 const char *name) 1432 { 1433 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1434 struct spdk_fs_request *req; 1435 struct spdk_fs_cb_args *args; 1436 int rc; 1437 1438 req = alloc_fs_request(channel); 1439 if (req == NULL) { 1440 return -ENOMEM; 1441 } 1442 1443 args = &req->args; 1444 args->fs = fs; 1445 args->op.delete.name = name; 1446 args->sem = &channel->sem; 1447 fs->send_request(__fs_delete_file, req); 1448 sem_wait(&channel->sem); 1449 rc = args->rc; 1450 free_fs_request(req); 1451 1452 return rc; 1453 } 1454 1455 spdk_fs_iter 1456 spdk_fs_iter_first(struct spdk_filesystem *fs) 1457 { 1458 struct spdk_file *f; 1459 1460 f = TAILQ_FIRST(&fs->files); 1461 return f; 1462 } 1463 1464 spdk_fs_iter 1465 spdk_fs_iter_next(spdk_fs_iter iter) 1466 { 1467 struct spdk_file *f = iter; 1468 1469 if (f == NULL) { 1470 return NULL; 1471 } 1472 1473 f = TAILQ_NEXT(f, tailq); 1474 return f; 1475 } 1476 1477 const char * 1478 spdk_file_get_name(struct spdk_file *file) 1479 { 1480 return file->name; 1481 } 1482 1483 uint64_t 1484 spdk_file_get_length(struct spdk_file *file) 1485 { 1486 uint64_t length; 1487 1488 assert(file != NULL); 1489 1490 length = file->append_pos >= file->length ? file->append_pos : file->length; 1491 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1492 return length; 1493 } 1494 1495 static void 1496 fs_truncate_complete_cb(void *ctx, int bserrno) 1497 { 1498 struct spdk_fs_request *req = ctx; 1499 struct spdk_fs_cb_args *args = &req->args; 1500 1501 args->fn.file_op(args->arg, bserrno); 1502 free_fs_request(req); 1503 } 1504 1505 static void 1506 fs_truncate_resize_cb(void *ctx, int bserrno) 1507 { 1508 struct spdk_fs_request *req = ctx; 1509 struct spdk_fs_cb_args *args = &req->args; 1510 struct spdk_file *file = args->file; 1511 uint64_t *length = &args->op.truncate.length; 1512 1513 if (bserrno) { 1514 args->fn.file_op(args->arg, bserrno); 1515 free_fs_request(req); 1516 return; 1517 } 1518 1519 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1520 1521 file->length = *length; 1522 if (file->append_pos > file->length) { 1523 file->append_pos = file->length; 1524 } 1525 1526 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1527 } 1528 1529 static uint64_t 1530 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1531 { 1532 return (length + cluster_sz - 1) / cluster_sz; 1533 } 1534 1535 void 1536 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1537 spdk_file_op_complete cb_fn, void *cb_arg) 1538 { 1539 struct spdk_filesystem *fs; 1540 size_t num_clusters; 1541 struct spdk_fs_request *req; 1542 struct spdk_fs_cb_args *args; 1543 1544 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1545 if (length == file->length) { 1546 cb_fn(cb_arg, 0); 1547 return; 1548 } 1549 1550 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1551 if (req == NULL) { 1552 cb_fn(cb_arg, -ENOMEM); 1553 return; 1554 } 1555 1556 args = &req->args; 1557 args->fn.file_op = cb_fn; 1558 args->arg = cb_arg; 1559 args->file = file; 1560 args->op.truncate.length = length; 1561 fs = file->fs; 1562 1563 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1564 1565 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1566 } 1567 1568 static void 1569 __truncate(void *arg) 1570 { 1571 struct spdk_fs_request *req = arg; 1572 struct spdk_fs_cb_args *args = &req->args; 1573 1574 spdk_file_truncate_async(args->file, args->op.truncate.length, 1575 args->fn.file_op, args); 1576 } 1577 1578 int 1579 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1580 uint64_t length) 1581 { 1582 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1583 struct spdk_fs_request *req; 1584 struct spdk_fs_cb_args *args; 1585 int rc; 1586 1587 req = alloc_fs_request(channel); 1588 if (req == NULL) { 1589 return -ENOMEM; 1590 } 1591 1592 args = &req->args; 1593 1594 args->file = file; 1595 args->op.truncate.length = length; 1596 args->fn.file_op = __wake_caller; 1597 args->sem = &channel->sem; 1598 1599 channel->send_request(__truncate, req); 1600 sem_wait(&channel->sem); 1601 rc = args->rc; 1602 free_fs_request(req); 1603 1604 return rc; 1605 } 1606 1607 static void 1608 __rw_done(void *ctx, int bserrno) 1609 { 1610 struct spdk_fs_request *req = ctx; 1611 struct spdk_fs_cb_args *args = &req->args; 1612 1613 spdk_free(args->op.rw.pin_buf); 1614 args->fn.file_op(args->arg, bserrno); 1615 free_fs_request(req); 1616 } 1617 1618 static void 1619 __read_done(void *ctx, int bserrno) 1620 { 1621 struct spdk_fs_request *req = ctx; 1622 struct spdk_fs_cb_args *args = &req->args; 1623 1624 assert(req != NULL); 1625 if (args->op.rw.is_read) { 1626 memcpy(args->iovs[0].iov_base, 1627 args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1628 args->iovs[0].iov_len); 1629 __rw_done(req, 0); 1630 } else { 1631 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1632 args->iovs[0].iov_base, 1633 args->iovs[0].iov_len); 1634 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1635 args->op.rw.pin_buf, 1636 args->op.rw.start_lba, args->op.rw.num_lba, 1637 __rw_done, req); 1638 } 1639 } 1640 1641 static void 1642 __do_blob_read(void *ctx, int fserrno) 1643 { 1644 struct spdk_fs_request *req = ctx; 1645 struct spdk_fs_cb_args *args = &req->args; 1646 1647 if (fserrno) { 1648 __rw_done(req, fserrno); 1649 return; 1650 } 1651 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1652 args->op.rw.pin_buf, 1653 args->op.rw.start_lba, args->op.rw.num_lba, 1654 __read_done, req); 1655 } 1656 1657 static void 1658 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1659 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1660 { 1661 uint64_t end_lba; 1662 1663 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1664 *start_lba = offset / *lba_size; 1665 end_lba = (offset + length - 1) / *lba_size; 1666 *num_lba = (end_lba - *start_lba + 1); 1667 } 1668 1669 static void 1670 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1671 void *payload, uint64_t offset, uint64_t length, 1672 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1673 { 1674 struct spdk_fs_request *req; 1675 struct spdk_fs_cb_args *args; 1676 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1677 uint64_t start_lba, num_lba, pin_buf_length; 1678 uint32_t lba_size; 1679 1680 if (is_read && offset + length > file->length) { 1681 cb_fn(cb_arg, -EINVAL); 1682 return; 1683 } 1684 1685 req = alloc_fs_request_with_iov(channel, 1); 1686 if (req == NULL) { 1687 cb_fn(cb_arg, -ENOMEM); 1688 return; 1689 } 1690 1691 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1692 1693 args = &req->args; 1694 args->fn.file_op = cb_fn; 1695 args->arg = cb_arg; 1696 args->file = file; 1697 args->op.rw.channel = channel->bs_channel; 1698 args->iovs[0].iov_base = payload; 1699 args->iovs[0].iov_len = (size_t)length; 1700 args->op.rw.is_read = is_read; 1701 args->op.rw.offset = offset; 1702 args->op.rw.blocklen = lba_size; 1703 1704 pin_buf_length = num_lba * lba_size; 1705 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1706 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1707 if (args->op.rw.pin_buf == NULL) { 1708 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1709 file->name, offset, length); 1710 free_fs_request(req); 1711 cb_fn(cb_arg, -ENOMEM); 1712 return; 1713 } 1714 1715 args->op.rw.start_lba = start_lba; 1716 args->op.rw.num_lba = num_lba; 1717 1718 if (!is_read && file->length < offset + length) { 1719 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1720 } else { 1721 __do_blob_read(req, 0); 1722 } 1723 } 1724 1725 void 1726 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1727 void *payload, uint64_t offset, uint64_t length, 1728 spdk_file_op_complete cb_fn, void *cb_arg) 1729 { 1730 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1731 } 1732 1733 void 1734 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1735 void *payload, uint64_t offset, uint64_t length, 1736 spdk_file_op_complete cb_fn, void *cb_arg) 1737 { 1738 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1739 file->name, offset, length); 1740 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1741 } 1742 1743 struct spdk_io_channel * 1744 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1745 { 1746 struct spdk_io_channel *io_channel; 1747 struct spdk_fs_channel *fs_channel; 1748 1749 io_channel = spdk_get_io_channel(&fs->io_target); 1750 fs_channel = spdk_io_channel_get_ctx(io_channel); 1751 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1752 fs_channel->send_request = __send_request_direct; 1753 1754 return io_channel; 1755 } 1756 1757 void 1758 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1759 { 1760 spdk_put_io_channel(channel); 1761 } 1762 1763 struct spdk_fs_thread_ctx * 1764 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1765 { 1766 struct spdk_fs_thread_ctx *ctx; 1767 1768 ctx = calloc(1, sizeof(*ctx)); 1769 if (!ctx) { 1770 return NULL; 1771 } 1772 1773 _spdk_fs_channel_create(fs, &ctx->ch, 512); 1774 1775 ctx->ch.send_request = fs->send_request; 1776 ctx->ch.sync = 1; 1777 pthread_spin_init(&ctx->ch.lock, 0); 1778 1779 return ctx; 1780 } 1781 1782 1783 void 1784 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1785 { 1786 _spdk_fs_channel_destroy(NULL, &ctx->ch); 1787 free(ctx); 1788 } 1789 1790 void 1791 spdk_fs_set_cache_size(uint64_t size_in_mb) 1792 { 1793 g_fs_cache_size = size_in_mb * 1024 * 1024; 1794 } 1795 1796 uint64_t 1797 spdk_fs_get_cache_size(void) 1798 { 1799 return g_fs_cache_size / (1024 * 1024); 1800 } 1801 1802 static void __file_flush(void *ctx); 1803 1804 static void * 1805 alloc_cache_memory_buffer(struct spdk_file *context) 1806 { 1807 struct spdk_file *file; 1808 void *buf; 1809 1810 buf = spdk_mempool_get(g_cache_pool); 1811 if (buf != NULL) { 1812 return buf; 1813 } 1814 1815 pthread_spin_lock(&g_caches_lock); 1816 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1817 if (!file->open_for_writing && 1818 file->priority == SPDK_FILE_PRIORITY_LOW && 1819 file != context) { 1820 break; 1821 } 1822 } 1823 pthread_spin_unlock(&g_caches_lock); 1824 if (file != NULL) { 1825 cache_free_buffers(file); 1826 buf = spdk_mempool_get(g_cache_pool); 1827 if (buf != NULL) { 1828 return buf; 1829 } 1830 } 1831 1832 pthread_spin_lock(&g_caches_lock); 1833 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1834 if (!file->open_for_writing && file != context) { 1835 break; 1836 } 1837 } 1838 pthread_spin_unlock(&g_caches_lock); 1839 if (file != NULL) { 1840 cache_free_buffers(file); 1841 buf = spdk_mempool_get(g_cache_pool); 1842 if (buf != NULL) { 1843 return buf; 1844 } 1845 } 1846 1847 pthread_spin_lock(&g_caches_lock); 1848 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1849 if (file != context) { 1850 break; 1851 } 1852 } 1853 pthread_spin_unlock(&g_caches_lock); 1854 if (file != NULL) { 1855 cache_free_buffers(file); 1856 buf = spdk_mempool_get(g_cache_pool); 1857 if (buf != NULL) { 1858 return buf; 1859 } 1860 } 1861 1862 return NULL; 1863 } 1864 1865 static struct cache_buffer * 1866 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1867 { 1868 struct cache_buffer *buf; 1869 int count = 0; 1870 1871 buf = calloc(1, sizeof(*buf)); 1872 if (buf == NULL) { 1873 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1874 return NULL; 1875 } 1876 1877 buf->buf = alloc_cache_memory_buffer(file); 1878 while (buf->buf == NULL) { 1879 /* 1880 * TODO: alloc_cache_memory_buffer() should eventually free 1881 * some buffers. Need a more sophisticated check here, instead 1882 * of just bailing if 100 tries does not result in getting a 1883 * free buffer. This will involve using the sync channel's 1884 * semaphore to block until a buffer becomes available. 1885 */ 1886 if (count++ == 100) { 1887 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 1888 file, offset); 1889 free(buf); 1890 return NULL; 1891 } 1892 buf->buf = alloc_cache_memory_buffer(file); 1893 } 1894 1895 buf->buf_size = CACHE_BUFFER_SIZE; 1896 buf->offset = offset; 1897 1898 pthread_spin_lock(&g_caches_lock); 1899 if (file->tree->present_mask == 0) { 1900 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1901 } 1902 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1903 pthread_spin_unlock(&g_caches_lock); 1904 1905 return buf; 1906 } 1907 1908 static struct cache_buffer * 1909 cache_append_buffer(struct spdk_file *file) 1910 { 1911 struct cache_buffer *last; 1912 1913 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1914 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1915 1916 last = cache_insert_buffer(file, file->append_pos); 1917 if (last == NULL) { 1918 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1919 return NULL; 1920 } 1921 1922 file->last = last; 1923 1924 return last; 1925 } 1926 1927 static void __check_sync_reqs(struct spdk_file *file); 1928 1929 static void 1930 __file_cache_finish_sync(void *ctx, int bserrno) 1931 { 1932 struct spdk_file *file = ctx; 1933 struct spdk_fs_request *sync_req; 1934 struct spdk_fs_cb_args *sync_args; 1935 1936 pthread_spin_lock(&file->lock); 1937 sync_req = TAILQ_FIRST(&file->sync_requests); 1938 sync_args = &sync_req->args; 1939 assert(sync_args->op.sync.offset <= file->length_flushed); 1940 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1941 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1942 pthread_spin_unlock(&file->lock); 1943 1944 sync_args->fn.file_op(sync_args->arg, bserrno); 1945 __check_sync_reqs(file); 1946 1947 pthread_spin_lock(&file->lock); 1948 free_fs_request(sync_req); 1949 pthread_spin_unlock(&file->lock); 1950 } 1951 1952 static void 1953 __check_sync_reqs(struct spdk_file *file) 1954 { 1955 struct spdk_fs_request *sync_req; 1956 1957 pthread_spin_lock(&file->lock); 1958 1959 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1960 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1961 break; 1962 } 1963 } 1964 1965 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1966 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1967 sync_req->args.op.sync.xattr_in_progress = true; 1968 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1969 sizeof(file->length_flushed)); 1970 1971 pthread_spin_unlock(&file->lock); 1972 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file); 1973 } else { 1974 pthread_spin_unlock(&file->lock); 1975 } 1976 } 1977 1978 static void 1979 __file_flush_done(void *ctx, int bserrno) 1980 { 1981 struct spdk_fs_request *req = ctx; 1982 struct spdk_fs_cb_args *args = &req->args; 1983 struct spdk_file *file = args->file; 1984 struct cache_buffer *next = args->op.flush.cache_buffer; 1985 1986 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1987 1988 pthread_spin_lock(&file->lock); 1989 next->in_progress = false; 1990 next->bytes_flushed += args->op.flush.length; 1991 file->length_flushed += args->op.flush.length; 1992 if (file->length_flushed > file->length) { 1993 file->length = file->length_flushed; 1994 } 1995 if (next->bytes_flushed == next->buf_size) { 1996 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1997 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1998 } 1999 2000 /* 2001 * Assert that there is no cached data that extends past the end of the underlying 2002 * blob. 2003 */ 2004 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2005 next->bytes_filled == 0); 2006 2007 pthread_spin_unlock(&file->lock); 2008 2009 __check_sync_reqs(file); 2010 2011 __file_flush(req); 2012 } 2013 2014 static void 2015 __file_flush(void *ctx) 2016 { 2017 struct spdk_fs_request *req = ctx; 2018 struct spdk_fs_cb_args *args = &req->args; 2019 struct spdk_file *file = args->file; 2020 struct cache_buffer *next; 2021 uint64_t offset, length, start_lba, num_lba; 2022 uint32_t lba_size; 2023 2024 pthread_spin_lock(&file->lock); 2025 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2026 if (next == NULL || next->in_progress) { 2027 /* 2028 * There is either no data to flush, or a flush I/O is already in 2029 * progress. So return immediately - if a flush I/O is in 2030 * progress we will flush more data after that is completed. 2031 */ 2032 free_fs_request(req); 2033 if (next == NULL) { 2034 /* 2035 * For cases where a file's cache was evicted, and then the 2036 * file was later appended, we will write the data directly 2037 * to disk and bypass cache. So just update length_flushed 2038 * here to reflect that all data was already written to disk. 2039 */ 2040 file->length_flushed = file->append_pos; 2041 } 2042 pthread_spin_unlock(&file->lock); 2043 if (next == NULL) { 2044 /* 2045 * There is no data to flush, but we still need to check for any 2046 * outstanding sync requests to make sure metadata gets updated. 2047 */ 2048 __check_sync_reqs(file); 2049 } 2050 return; 2051 } 2052 2053 offset = next->offset + next->bytes_flushed; 2054 length = next->bytes_filled - next->bytes_flushed; 2055 if (length == 0) { 2056 free_fs_request(req); 2057 pthread_spin_unlock(&file->lock); 2058 return; 2059 } 2060 args->op.flush.length = length; 2061 args->op.flush.cache_buffer = next; 2062 2063 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2064 2065 next->in_progress = true; 2066 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2067 offset, length, start_lba, num_lba); 2068 pthread_spin_unlock(&file->lock); 2069 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2070 next->buf + (start_lba * lba_size) - next->offset, 2071 start_lba, num_lba, __file_flush_done, req); 2072 } 2073 2074 static void 2075 __file_extend_done(void *arg, int bserrno) 2076 { 2077 struct spdk_fs_cb_args *args = arg; 2078 2079 __wake_caller(args, bserrno); 2080 } 2081 2082 static void 2083 __file_extend_resize_cb(void *_args, int bserrno) 2084 { 2085 struct spdk_fs_cb_args *args = _args; 2086 struct spdk_file *file = args->file; 2087 2088 if (bserrno) { 2089 __wake_caller(args, bserrno); 2090 return; 2091 } 2092 2093 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2094 } 2095 2096 static void 2097 __file_extend_blob(void *_args) 2098 { 2099 struct spdk_fs_cb_args *args = _args; 2100 struct spdk_file *file = args->file; 2101 2102 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2103 } 2104 2105 static void 2106 __rw_from_file_done(void *ctx, int bserrno) 2107 { 2108 struct spdk_fs_request *req = ctx; 2109 2110 __wake_caller(&req->args, bserrno); 2111 free_fs_request(req); 2112 } 2113 2114 static void 2115 __rw_from_file(void *ctx) 2116 { 2117 struct spdk_fs_request *req = ctx; 2118 struct spdk_fs_cb_args *args = &req->args; 2119 struct spdk_file *file = args->file; 2120 2121 if (args->op.rw.is_read) { 2122 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2123 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2124 __rw_from_file_done, req); 2125 } else { 2126 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2127 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2128 __rw_from_file_done, req); 2129 } 2130 } 2131 2132 static int 2133 __send_rw_from_file(struct spdk_file *file, void *payload, 2134 uint64_t offset, uint64_t length, bool is_read, 2135 struct spdk_fs_channel *channel) 2136 { 2137 struct spdk_fs_request *req; 2138 struct spdk_fs_cb_args *args; 2139 2140 req = alloc_fs_request_with_iov(channel, 1); 2141 if (req == NULL) { 2142 sem_post(&channel->sem); 2143 return -ENOMEM; 2144 } 2145 2146 args = &req->args; 2147 args->file = file; 2148 args->sem = &channel->sem; 2149 args->iovs[0].iov_base = payload; 2150 args->iovs[0].iov_len = (size_t)length; 2151 args->op.rw.offset = offset; 2152 args->op.rw.is_read = is_read; 2153 file->fs->send_request(__rw_from_file, req); 2154 return 0; 2155 } 2156 2157 int 2158 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2159 void *payload, uint64_t offset, uint64_t length) 2160 { 2161 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2162 struct spdk_fs_request *flush_req; 2163 uint64_t rem_length, copy, blob_size, cluster_sz; 2164 uint32_t cache_buffers_filled = 0; 2165 uint8_t *cur_payload; 2166 struct cache_buffer *last; 2167 2168 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2169 2170 if (length == 0) { 2171 return 0; 2172 } 2173 2174 if (offset != file->append_pos) { 2175 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2176 return -EINVAL; 2177 } 2178 2179 pthread_spin_lock(&file->lock); 2180 file->open_for_writing = true; 2181 2182 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2183 cache_append_buffer(file); 2184 } 2185 2186 if (file->last == NULL) { 2187 int rc; 2188 2189 file->append_pos += length; 2190 pthread_spin_unlock(&file->lock); 2191 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2192 sem_wait(&channel->sem); 2193 return rc; 2194 } 2195 2196 blob_size = __file_get_blob_size(file); 2197 2198 if ((offset + length) > blob_size) { 2199 struct spdk_fs_cb_args extend_args = {}; 2200 2201 cluster_sz = file->fs->bs_opts.cluster_sz; 2202 extend_args.sem = &channel->sem; 2203 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2204 extend_args.file = file; 2205 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2206 pthread_spin_unlock(&file->lock); 2207 file->fs->send_request(__file_extend_blob, &extend_args); 2208 sem_wait(&channel->sem); 2209 if (extend_args.rc) { 2210 return extend_args.rc; 2211 } 2212 } 2213 2214 flush_req = alloc_fs_request(channel); 2215 if (flush_req == NULL) { 2216 pthread_spin_unlock(&file->lock); 2217 return -ENOMEM; 2218 } 2219 2220 last = file->last; 2221 rem_length = length; 2222 cur_payload = payload; 2223 while (rem_length > 0) { 2224 copy = last->buf_size - last->bytes_filled; 2225 if (copy > rem_length) { 2226 copy = rem_length; 2227 } 2228 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2229 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2230 file->append_pos += copy; 2231 if (file->length < file->append_pos) { 2232 file->length = file->append_pos; 2233 } 2234 cur_payload += copy; 2235 last->bytes_filled += copy; 2236 rem_length -= copy; 2237 if (last->bytes_filled == last->buf_size) { 2238 cache_buffers_filled++; 2239 last = cache_append_buffer(file); 2240 if (last == NULL) { 2241 BLOBFS_TRACE(file, "nomem\n"); 2242 free_fs_request(flush_req); 2243 pthread_spin_unlock(&file->lock); 2244 return -ENOMEM; 2245 } 2246 } 2247 } 2248 2249 pthread_spin_unlock(&file->lock); 2250 2251 if (cache_buffers_filled == 0) { 2252 free_fs_request(flush_req); 2253 return 0; 2254 } 2255 2256 flush_req->args.file = file; 2257 file->fs->send_request(__file_flush, flush_req); 2258 return 0; 2259 } 2260 2261 static void 2262 __readahead_done(void *ctx, int bserrno) 2263 { 2264 struct spdk_fs_request *req = ctx; 2265 struct spdk_fs_cb_args *args = &req->args; 2266 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2267 struct spdk_file *file = args->file; 2268 2269 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2270 2271 pthread_spin_lock(&file->lock); 2272 cache_buffer->bytes_filled = args->op.readahead.length; 2273 cache_buffer->bytes_flushed = args->op.readahead.length; 2274 cache_buffer->in_progress = false; 2275 pthread_spin_unlock(&file->lock); 2276 2277 free_fs_request(req); 2278 } 2279 2280 static void 2281 __readahead(void *ctx) 2282 { 2283 struct spdk_fs_request *req = ctx; 2284 struct spdk_fs_cb_args *args = &req->args; 2285 struct spdk_file *file = args->file; 2286 uint64_t offset, length, start_lba, num_lba; 2287 uint32_t lba_size; 2288 2289 offset = args->op.readahead.offset; 2290 length = args->op.readahead.length; 2291 assert(length > 0); 2292 2293 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2294 2295 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2296 offset, length, start_lba, num_lba); 2297 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2298 args->op.readahead.cache_buffer->buf, 2299 start_lba, num_lba, __readahead_done, req); 2300 } 2301 2302 static uint64_t 2303 __next_cache_buffer_offset(uint64_t offset) 2304 { 2305 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2306 } 2307 2308 static void 2309 check_readahead(struct spdk_file *file, uint64_t offset, 2310 struct spdk_fs_channel *channel) 2311 { 2312 struct spdk_fs_request *req; 2313 struct spdk_fs_cb_args *args; 2314 2315 offset = __next_cache_buffer_offset(offset); 2316 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2317 return; 2318 } 2319 2320 req = alloc_fs_request(channel); 2321 if (req == NULL) { 2322 return; 2323 } 2324 args = &req->args; 2325 2326 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2327 2328 args->file = file; 2329 args->op.readahead.offset = offset; 2330 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2331 if (!args->op.readahead.cache_buffer) { 2332 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2333 free_fs_request(req); 2334 return; 2335 } 2336 2337 args->op.readahead.cache_buffer->in_progress = true; 2338 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2339 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2340 } else { 2341 args->op.readahead.length = CACHE_BUFFER_SIZE; 2342 } 2343 file->fs->send_request(__readahead, req); 2344 } 2345 2346 static int 2347 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, 2348 struct spdk_fs_channel *channel) 2349 { 2350 struct cache_buffer *buf; 2351 int rc; 2352 2353 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2354 if (buf == NULL) { 2355 pthread_spin_unlock(&file->lock); 2356 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2357 pthread_spin_lock(&file->lock); 2358 return rc; 2359 } 2360 2361 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2362 length = buf->offset + buf->bytes_filled - offset; 2363 } 2364 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2365 memcpy(payload, &buf->buf[offset - buf->offset], length); 2366 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2367 pthread_spin_lock(&g_caches_lock); 2368 spdk_tree_remove_buffer(file->tree, buf); 2369 if (file->tree->present_mask == 0) { 2370 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2371 } 2372 pthread_spin_unlock(&g_caches_lock); 2373 } 2374 2375 sem_post(&channel->sem); 2376 return 0; 2377 } 2378 2379 int64_t 2380 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2381 void *payload, uint64_t offset, uint64_t length) 2382 { 2383 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2384 uint64_t final_offset, final_length; 2385 uint32_t sub_reads = 0; 2386 int rc = 0; 2387 2388 pthread_spin_lock(&file->lock); 2389 2390 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2391 2392 file->open_for_writing = false; 2393 2394 if (length == 0 || offset >= file->append_pos) { 2395 pthread_spin_unlock(&file->lock); 2396 return 0; 2397 } 2398 2399 if (offset + length > file->append_pos) { 2400 length = file->append_pos - offset; 2401 } 2402 2403 if (offset != file->next_seq_offset) { 2404 file->seq_byte_count = 0; 2405 } 2406 file->seq_byte_count += length; 2407 file->next_seq_offset = offset + length; 2408 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2409 check_readahead(file, offset, channel); 2410 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2411 } 2412 2413 final_length = 0; 2414 final_offset = offset + length; 2415 while (offset < final_offset) { 2416 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2417 if (length > (final_offset - offset)) { 2418 length = final_offset - offset; 2419 } 2420 rc = __file_read(file, payload, offset, length, channel); 2421 if (rc == 0) { 2422 final_length += length; 2423 } else { 2424 break; 2425 } 2426 payload += length; 2427 offset += length; 2428 sub_reads++; 2429 } 2430 pthread_spin_unlock(&file->lock); 2431 while (sub_reads-- > 0) { 2432 sem_wait(&channel->sem); 2433 } 2434 if (rc == 0) { 2435 return final_length; 2436 } else { 2437 return rc; 2438 } 2439 } 2440 2441 static void 2442 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2443 spdk_file_op_complete cb_fn, void *cb_arg) 2444 { 2445 struct spdk_fs_request *sync_req; 2446 struct spdk_fs_request *flush_req; 2447 struct spdk_fs_cb_args *sync_args; 2448 struct spdk_fs_cb_args *flush_args; 2449 2450 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2451 2452 pthread_spin_lock(&file->lock); 2453 if (file->append_pos <= file->length_flushed) { 2454 BLOBFS_TRACE(file, "done - no data to flush\n"); 2455 pthread_spin_unlock(&file->lock); 2456 cb_fn(cb_arg, 0); 2457 return; 2458 } 2459 2460 sync_req = alloc_fs_request(channel); 2461 if (!sync_req) { 2462 pthread_spin_unlock(&file->lock); 2463 cb_fn(cb_arg, -ENOMEM); 2464 return; 2465 } 2466 sync_args = &sync_req->args; 2467 2468 flush_req = alloc_fs_request(channel); 2469 if (!flush_req) { 2470 pthread_spin_unlock(&file->lock); 2471 cb_fn(cb_arg, -ENOMEM); 2472 return; 2473 } 2474 flush_args = &flush_req->args; 2475 2476 sync_args->file = file; 2477 sync_args->fn.file_op = cb_fn; 2478 sync_args->arg = cb_arg; 2479 sync_args->op.sync.offset = file->append_pos; 2480 sync_args->op.sync.xattr_in_progress = false; 2481 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2482 pthread_spin_unlock(&file->lock); 2483 2484 flush_args->file = file; 2485 channel->send_request(__file_flush, flush_req); 2486 } 2487 2488 int 2489 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2490 { 2491 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2492 struct spdk_fs_cb_args args = {}; 2493 2494 args.sem = &channel->sem; 2495 _file_sync(file, channel, __wake_caller, &args); 2496 sem_wait(&channel->sem); 2497 2498 return args.rc; 2499 } 2500 2501 void 2502 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2503 spdk_file_op_complete cb_fn, void *cb_arg) 2504 { 2505 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2506 2507 _file_sync(file, channel, cb_fn, cb_arg); 2508 } 2509 2510 void 2511 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2512 { 2513 BLOBFS_TRACE(file, "priority=%u\n", priority); 2514 file->priority = priority; 2515 2516 } 2517 2518 /* 2519 * Close routines 2520 */ 2521 2522 static void 2523 __file_close_async_done(void *ctx, int bserrno) 2524 { 2525 struct spdk_fs_request *req = ctx; 2526 struct spdk_fs_cb_args *args = &req->args; 2527 struct spdk_file *file = args->file; 2528 2529 if (file->is_deleted) { 2530 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2531 return; 2532 } 2533 2534 args->fn.file_op(args->arg, bserrno); 2535 free_fs_request(req); 2536 } 2537 2538 static void 2539 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2540 { 2541 struct spdk_blob *blob; 2542 2543 pthread_spin_lock(&file->lock); 2544 if (file->ref_count == 0) { 2545 pthread_spin_unlock(&file->lock); 2546 __file_close_async_done(req, -EBADF); 2547 return; 2548 } 2549 2550 file->ref_count--; 2551 if (file->ref_count > 0) { 2552 pthread_spin_unlock(&file->lock); 2553 req->args.fn.file_op(req->args.arg, 0); 2554 free_fs_request(req); 2555 return; 2556 } 2557 2558 pthread_spin_unlock(&file->lock); 2559 2560 blob = file->blob; 2561 file->blob = NULL; 2562 spdk_blob_close(blob, __file_close_async_done, req); 2563 } 2564 2565 static void 2566 __file_close_async__sync_done(void *arg, int fserrno) 2567 { 2568 struct spdk_fs_request *req = arg; 2569 struct spdk_fs_cb_args *args = &req->args; 2570 2571 __file_close_async(args->file, req); 2572 } 2573 2574 void 2575 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2576 { 2577 struct spdk_fs_request *req; 2578 struct spdk_fs_cb_args *args; 2579 2580 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2581 if (req == NULL) { 2582 cb_fn(cb_arg, -ENOMEM); 2583 return; 2584 } 2585 2586 args = &req->args; 2587 args->file = file; 2588 args->fn.file_op = cb_fn; 2589 args->arg = cb_arg; 2590 2591 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2592 } 2593 2594 static void 2595 __file_close(void *arg) 2596 { 2597 struct spdk_fs_request *req = arg; 2598 struct spdk_fs_cb_args *args = &req->args; 2599 struct spdk_file *file = args->file; 2600 2601 __file_close_async(file, req); 2602 } 2603 2604 int 2605 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2606 { 2607 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2608 struct spdk_fs_request *req; 2609 struct spdk_fs_cb_args *args; 2610 2611 req = alloc_fs_request(channel); 2612 if (req == NULL) { 2613 return -ENOMEM; 2614 } 2615 2616 args = &req->args; 2617 2618 spdk_file_sync(file, ctx); 2619 BLOBFS_TRACE(file, "name=%s\n", file->name); 2620 args->file = file; 2621 args->sem = &channel->sem; 2622 args->fn.file_op = __wake_caller; 2623 args->arg = req; 2624 channel->send_request(__file_close, req); 2625 sem_wait(&channel->sem); 2626 2627 return args->rc; 2628 } 2629 2630 int 2631 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2632 { 2633 if (size < sizeof(spdk_blob_id)) { 2634 return -EINVAL; 2635 } 2636 2637 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2638 2639 return sizeof(spdk_blob_id); 2640 } 2641 2642 static void 2643 cache_free_buffers(struct spdk_file *file) 2644 { 2645 BLOBFS_TRACE(file, "free=%s\n", file->name); 2646 pthread_spin_lock(&file->lock); 2647 pthread_spin_lock(&g_caches_lock); 2648 if (file->tree->present_mask == 0) { 2649 pthread_spin_unlock(&g_caches_lock); 2650 pthread_spin_unlock(&file->lock); 2651 return; 2652 } 2653 spdk_tree_free_buffers(file->tree); 2654 2655 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2656 /* If not freed, put it in the end of the queue */ 2657 if (file->tree->present_mask != 0) { 2658 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2659 } 2660 file->last = NULL; 2661 pthread_spin_unlock(&g_caches_lock); 2662 pthread_spin_unlock(&file->lock); 2663 } 2664 2665 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2666 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2667