1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 void 64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 65 { 66 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 67 free(cache_buffer); 68 } 69 70 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 71 72 struct spdk_file { 73 struct spdk_filesystem *fs; 74 struct spdk_blob *blob; 75 char *name; 76 uint64_t length; 77 bool is_deleted; 78 bool open_for_writing; 79 uint64_t length_flushed; 80 uint64_t append_pos; 81 uint64_t seq_byte_count; 82 uint64_t next_seq_offset; 83 uint32_t priority; 84 TAILQ_ENTRY(spdk_file) tailq; 85 spdk_blob_id blobid; 86 uint32_t ref_count; 87 pthread_spinlock_t lock; 88 struct cache_buffer *last; 89 struct cache_tree *tree; 90 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 91 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 92 TAILQ_ENTRY(spdk_file) cache_tailq; 93 }; 94 95 struct spdk_deleted_file { 96 spdk_blob_id id; 97 TAILQ_ENTRY(spdk_deleted_file) tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 struct iovec *iovs; 138 uint32_t iovcnt; 139 struct iovec iov; 140 union { 141 struct { 142 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 143 } fs_load; 144 struct { 145 uint64_t length; 146 } truncate; 147 struct { 148 struct spdk_io_channel *channel; 149 void *pin_buf; 150 int is_read; 151 off_t offset; 152 size_t length; 153 uint64_t start_lba; 154 uint64_t num_lba; 155 uint32_t blocklen; 156 } rw; 157 struct { 158 const char *old_name; 159 const char *new_name; 160 } rename; 161 struct { 162 struct cache_buffer *cache_buffer; 163 uint64_t length; 164 } flush; 165 struct { 166 struct cache_buffer *cache_buffer; 167 uint64_t length; 168 uint64_t offset; 169 } readahead; 170 struct { 171 uint64_t offset; 172 TAILQ_ENTRY(spdk_fs_request) tailq; 173 bool xattr_in_progress; 174 } sync; 175 struct { 176 uint32_t num_clusters; 177 } resize; 178 struct { 179 const char *name; 180 uint32_t flags; 181 TAILQ_ENTRY(spdk_fs_request) tailq; 182 } open; 183 struct { 184 const char *name; 185 struct spdk_blob *blob; 186 } create; 187 struct { 188 const char *name; 189 } delete; 190 struct { 191 const char *name; 192 } stat; 193 } op; 194 }; 195 196 static void cache_free_buffers(struct spdk_file *file); 197 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs); 198 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs); 199 200 void 201 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 202 { 203 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 204 } 205 206 static void 207 __initialize_cache(void) 208 { 209 assert(g_cache_pool == NULL); 210 211 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 212 g_fs_cache_size / CACHE_BUFFER_SIZE, 213 CACHE_BUFFER_SIZE, 214 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 215 SPDK_ENV_SOCKET_ID_ANY); 216 if (!g_cache_pool) { 217 SPDK_ERRLOG("Create mempool failed, you may " 218 "increase the memory and try again\n"); 219 assert(false); 220 } 221 TAILQ_INIT(&g_caches); 222 pthread_spin_init(&g_caches_lock, 0); 223 } 224 225 static void 226 __free_cache(void) 227 { 228 assert(g_cache_pool != NULL); 229 230 spdk_mempool_free(g_cache_pool); 231 g_cache_pool = NULL; 232 } 233 234 static uint64_t 235 __file_get_blob_size(struct spdk_file *file) 236 { 237 uint64_t cluster_sz; 238 239 cluster_sz = file->fs->bs_opts.cluster_sz; 240 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 241 } 242 243 struct spdk_fs_request { 244 struct spdk_fs_cb_args args; 245 TAILQ_ENTRY(spdk_fs_request) link; 246 struct spdk_fs_channel *channel; 247 }; 248 249 struct spdk_fs_channel { 250 struct spdk_fs_request *req_mem; 251 TAILQ_HEAD(, spdk_fs_request) reqs; 252 sem_t sem; 253 struct spdk_filesystem *fs; 254 struct spdk_io_channel *bs_channel; 255 fs_send_request_fn send_request; 256 bool sync; 257 uint32_t outstanding_reqs; 258 pthread_spinlock_t lock; 259 }; 260 261 /* For now, this is effectively an alias. But eventually we'll shift 262 * some data members over. */ 263 struct spdk_fs_thread_ctx { 264 struct spdk_fs_channel ch; 265 }; 266 267 static struct spdk_fs_request * 268 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 269 { 270 struct spdk_fs_request *req; 271 struct iovec *iovs = NULL; 272 273 if (iovcnt > 1) { 274 iovs = calloc(iovcnt, sizeof(struct iovec)); 275 if (!iovs) { 276 return NULL; 277 } 278 } 279 280 if (channel->sync) { 281 pthread_spin_lock(&channel->lock); 282 } 283 284 req = TAILQ_FIRST(&channel->reqs); 285 if (req) { 286 channel->outstanding_reqs++; 287 TAILQ_REMOVE(&channel->reqs, req, link); 288 } 289 290 if (channel->sync) { 291 pthread_spin_unlock(&channel->lock); 292 } 293 294 if (req == NULL) { 295 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 296 free(iovs); 297 return NULL; 298 } 299 memset(req, 0, sizeof(*req)); 300 req->channel = channel; 301 if (iovcnt > 1) { 302 req->args.iovs = iovs; 303 } else { 304 req->args.iovs = &req->args.iov; 305 } 306 req->args.iovcnt = iovcnt; 307 308 return req; 309 } 310 311 static struct spdk_fs_request * 312 alloc_fs_request(struct spdk_fs_channel *channel) 313 { 314 return alloc_fs_request_with_iov(channel, 0); 315 } 316 317 static void 318 free_fs_request(struct spdk_fs_request *req) 319 { 320 struct spdk_fs_channel *channel = req->channel; 321 322 if (req->args.iovcnt > 1) { 323 free(req->args.iovs); 324 } 325 326 if (channel->sync) { 327 pthread_spin_lock(&channel->lock); 328 } 329 330 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 331 channel->outstanding_reqs--; 332 333 if (channel->sync) { 334 pthread_spin_unlock(&channel->lock); 335 } 336 } 337 338 static int 339 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 340 uint32_t max_ops) 341 { 342 uint32_t i; 343 344 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 345 if (!channel->req_mem) { 346 return -1; 347 } 348 349 channel->outstanding_reqs = 0; 350 TAILQ_INIT(&channel->reqs); 351 sem_init(&channel->sem, 0, 0); 352 353 for (i = 0; i < max_ops; i++) { 354 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 355 } 356 357 channel->fs = fs; 358 359 return 0; 360 } 361 362 static int 363 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 364 { 365 struct spdk_filesystem *fs; 366 struct spdk_fs_channel *channel = ctx_buf; 367 368 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 369 370 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 371 } 372 373 static int 374 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 375 { 376 struct spdk_filesystem *fs; 377 struct spdk_fs_channel *channel = ctx_buf; 378 379 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 380 381 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 382 } 383 384 static int 385 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 386 { 387 struct spdk_filesystem *fs; 388 struct spdk_fs_channel *channel = ctx_buf; 389 390 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 391 392 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 393 } 394 395 static void 396 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 397 { 398 struct spdk_fs_channel *channel = ctx_buf; 399 400 if (channel->outstanding_reqs > 0) { 401 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 402 channel->outstanding_reqs); 403 } 404 405 free(channel->req_mem); 406 if (channel->bs_channel != NULL) { 407 spdk_bs_free_io_channel(channel->bs_channel); 408 } 409 } 410 411 static void 412 __send_request_direct(fs_request_fn fn, void *arg) 413 { 414 fn(arg); 415 } 416 417 static void 418 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 419 { 420 fs->bs = bs; 421 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 422 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 423 fs->md_target.md_fs_channel->send_request = __send_request_direct; 424 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 425 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 426 427 pthread_mutex_lock(&g_cache_init_lock); 428 if (g_fs_count == 0) { 429 __initialize_cache(); 430 } 431 g_fs_count++; 432 pthread_mutex_unlock(&g_cache_init_lock); 433 } 434 435 static void 436 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 437 { 438 struct spdk_fs_request *req = ctx; 439 struct spdk_fs_cb_args *args = &req->args; 440 struct spdk_filesystem *fs = args->fs; 441 442 if (bserrno == 0) { 443 common_fs_bs_init(fs, bs); 444 } else { 445 free(fs); 446 fs = NULL; 447 } 448 449 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 450 free_fs_request(req); 451 } 452 453 static void 454 fs_conf_parse(void) 455 { 456 struct spdk_conf_section *sp; 457 458 sp = spdk_conf_find_section(NULL, "Blobfs"); 459 if (sp == NULL) { 460 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 461 return; 462 } 463 464 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 465 if (g_fs_cache_buffer_shift <= 0) { 466 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 467 } 468 } 469 470 static struct spdk_filesystem * 471 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 472 { 473 struct spdk_filesystem *fs; 474 475 fs = calloc(1, sizeof(*fs)); 476 if (fs == NULL) { 477 return NULL; 478 } 479 480 fs->bdev = dev; 481 fs->send_request = send_request_fn; 482 TAILQ_INIT(&fs->files); 483 484 fs->md_target.max_ops = 512; 485 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 486 sizeof(struct spdk_fs_channel), "blobfs_md"); 487 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 488 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 489 490 fs->sync_target.max_ops = 512; 491 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 492 sizeof(struct spdk_fs_channel), "blobfs_sync"); 493 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 494 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 495 496 fs->io_target.max_ops = 512; 497 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 498 sizeof(struct spdk_fs_channel), "blobfs_io"); 499 500 return fs; 501 } 502 503 static void 504 __wake_caller(void *arg, int fserrno) 505 { 506 struct spdk_fs_cb_args *args = arg; 507 508 args->rc = fserrno; 509 sem_post(args->sem); 510 } 511 512 void 513 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 514 fs_send_request_fn send_request_fn, 515 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 516 { 517 struct spdk_filesystem *fs; 518 struct spdk_fs_request *req; 519 struct spdk_fs_cb_args *args; 520 struct spdk_bs_opts opts = {}; 521 522 fs = fs_alloc(dev, send_request_fn); 523 if (fs == NULL) { 524 cb_fn(cb_arg, NULL, -ENOMEM); 525 return; 526 } 527 528 fs_conf_parse(); 529 530 req = alloc_fs_request(fs->md_target.md_fs_channel); 531 if (req == NULL) { 532 spdk_fs_free_io_channels(fs); 533 spdk_fs_io_device_unregister(fs); 534 cb_fn(cb_arg, NULL, -ENOMEM); 535 return; 536 } 537 538 args = &req->args; 539 args->fn.fs_op_with_handle = cb_fn; 540 args->arg = cb_arg; 541 args->fs = fs; 542 543 spdk_bs_opts_init(&opts); 544 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 545 if (opt) { 546 opts.cluster_sz = opt->cluster_sz; 547 } 548 spdk_bs_init(dev, &opts, init_cb, req); 549 } 550 551 static struct spdk_file * 552 file_alloc(struct spdk_filesystem *fs) 553 { 554 struct spdk_file *file; 555 556 file = calloc(1, sizeof(*file)); 557 if (file == NULL) { 558 return NULL; 559 } 560 561 file->tree = calloc(1, sizeof(*file->tree)); 562 if (file->tree == NULL) { 563 free(file); 564 return NULL; 565 } 566 567 file->fs = fs; 568 TAILQ_INIT(&file->open_requests); 569 TAILQ_INIT(&file->sync_requests); 570 pthread_spin_init(&file->lock, 0); 571 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 572 file->priority = SPDK_FILE_PRIORITY_LOW; 573 return file; 574 } 575 576 static void fs_load_done(void *ctx, int bserrno); 577 578 static int 579 _handle_deleted_files(struct spdk_fs_request *req) 580 { 581 struct spdk_fs_cb_args *args = &req->args; 582 struct spdk_filesystem *fs = args->fs; 583 584 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 585 struct spdk_deleted_file *deleted_file; 586 587 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 588 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 589 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 590 free(deleted_file); 591 return 0; 592 } 593 594 return 1; 595 } 596 597 static void 598 fs_load_done(void *ctx, int bserrno) 599 { 600 struct spdk_fs_request *req = ctx; 601 struct spdk_fs_cb_args *args = &req->args; 602 struct spdk_filesystem *fs = args->fs; 603 604 /* The filesystem has been loaded. Now check if there are any files that 605 * were marked for deletion before last unload. Do not complete the 606 * fs_load callback until all of them have been deleted on disk. 607 */ 608 if (_handle_deleted_files(req) == 0) { 609 /* We found a file that's been marked for deleting but not actually 610 * deleted yet. This function will get called again once the delete 611 * operation is completed. 612 */ 613 return; 614 } 615 616 args->fn.fs_op_with_handle(args->arg, fs, 0); 617 free_fs_request(req); 618 619 } 620 621 static void 622 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 623 { 624 struct spdk_fs_request *req = ctx; 625 struct spdk_fs_cb_args *args = &req->args; 626 struct spdk_filesystem *fs = args->fs; 627 uint64_t *length; 628 const char *name; 629 uint32_t *is_deleted; 630 size_t value_len; 631 632 if (rc < 0) { 633 args->fn.fs_op_with_handle(args->arg, fs, rc); 634 free_fs_request(req); 635 return; 636 } 637 638 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 639 if (rc < 0) { 640 args->fn.fs_op_with_handle(args->arg, fs, rc); 641 free_fs_request(req); 642 return; 643 } 644 645 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 646 if (rc < 0) { 647 args->fn.fs_op_with_handle(args->arg, fs, rc); 648 free_fs_request(req); 649 return; 650 } 651 652 assert(value_len == 8); 653 654 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 655 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 656 if (rc < 0) { 657 struct spdk_file *f; 658 659 f = file_alloc(fs); 660 if (f == NULL) { 661 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 662 free_fs_request(req); 663 return; 664 } 665 666 f->name = strdup(name); 667 f->blobid = spdk_blob_get_id(blob); 668 f->length = *length; 669 f->length_flushed = *length; 670 f->append_pos = *length; 671 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 672 } else { 673 struct spdk_deleted_file *deleted_file; 674 675 deleted_file = calloc(1, sizeof(*deleted_file)); 676 if (deleted_file == NULL) { 677 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 678 free_fs_request(req); 679 return; 680 } 681 deleted_file->id = spdk_blob_get_id(blob); 682 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 683 } 684 } 685 686 static void 687 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 688 { 689 struct spdk_fs_request *req = ctx; 690 struct spdk_fs_cb_args *args = &req->args; 691 struct spdk_filesystem *fs = args->fs; 692 struct spdk_bs_type bstype; 693 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 694 static const struct spdk_bs_type zeros; 695 696 if (bserrno != 0) { 697 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 698 free_fs_request(req); 699 free(fs); 700 return; 701 } 702 703 bstype = spdk_bs_get_bstype(bs); 704 705 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 706 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 707 spdk_bs_set_bstype(bs, blobfs_type); 708 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 709 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 710 SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 711 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 712 free_fs_request(req); 713 free(fs); 714 return; 715 } 716 717 common_fs_bs_init(fs, bs); 718 fs_load_done(req, 0); 719 } 720 721 static void 722 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 723 { 724 assert(fs != NULL); 725 spdk_io_device_unregister(&fs->md_target, NULL); 726 spdk_io_device_unregister(&fs->sync_target, NULL); 727 spdk_io_device_unregister(&fs->io_target, NULL); 728 free(fs); 729 } 730 731 static void 732 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 733 { 734 assert(fs != NULL); 735 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 736 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 737 } 738 739 void 740 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 741 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 742 { 743 struct spdk_filesystem *fs; 744 struct spdk_fs_cb_args *args; 745 struct spdk_fs_request *req; 746 struct spdk_bs_opts bs_opts; 747 748 fs = fs_alloc(dev, send_request_fn); 749 if (fs == NULL) { 750 cb_fn(cb_arg, NULL, -ENOMEM); 751 return; 752 } 753 754 fs_conf_parse(); 755 756 req = alloc_fs_request(fs->md_target.md_fs_channel); 757 if (req == NULL) { 758 spdk_fs_free_io_channels(fs); 759 spdk_fs_io_device_unregister(fs); 760 cb_fn(cb_arg, NULL, -ENOMEM); 761 return; 762 } 763 764 args = &req->args; 765 args->fn.fs_op_with_handle = cb_fn; 766 args->arg = cb_arg; 767 args->fs = fs; 768 TAILQ_INIT(&args->op.fs_load.deleted_files); 769 spdk_bs_opts_init(&bs_opts); 770 bs_opts.iter_cb_fn = iter_cb; 771 bs_opts.iter_cb_arg = req; 772 spdk_bs_load(dev, &bs_opts, load_cb, req); 773 } 774 775 static void 776 unload_cb(void *ctx, int bserrno) 777 { 778 struct spdk_fs_request *req = ctx; 779 struct spdk_fs_cb_args *args = &req->args; 780 struct spdk_filesystem *fs = args->fs; 781 struct spdk_file *file, *tmp; 782 783 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 784 TAILQ_REMOVE(&fs->files, file, tailq); 785 cache_free_buffers(file); 786 free(file->name); 787 free(file->tree); 788 free(file); 789 } 790 791 pthread_mutex_lock(&g_cache_init_lock); 792 g_fs_count--; 793 if (g_fs_count == 0) { 794 __free_cache(); 795 } 796 pthread_mutex_unlock(&g_cache_init_lock); 797 798 args->fn.fs_op(args->arg, bserrno); 799 free(req); 800 801 spdk_fs_io_device_unregister(fs); 802 } 803 804 void 805 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 806 { 807 struct spdk_fs_request *req; 808 struct spdk_fs_cb_args *args; 809 810 /* 811 * We must free the md_channel before unloading the blobstore, so just 812 * allocate this request from the general heap. 813 */ 814 req = calloc(1, sizeof(*req)); 815 if (req == NULL) { 816 cb_fn(cb_arg, -ENOMEM); 817 return; 818 } 819 820 args = &req->args; 821 args->fn.fs_op = cb_fn; 822 args->arg = cb_arg; 823 args->fs = fs; 824 825 spdk_fs_free_io_channels(fs); 826 spdk_bs_unload(fs->bs, unload_cb, req); 827 } 828 829 static struct spdk_file * 830 fs_find_file(struct spdk_filesystem *fs, const char *name) 831 { 832 struct spdk_file *file; 833 834 TAILQ_FOREACH(file, &fs->files, tailq) { 835 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 836 return file; 837 } 838 } 839 840 return NULL; 841 } 842 843 void 844 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 845 spdk_file_stat_op_complete cb_fn, void *cb_arg) 846 { 847 struct spdk_file_stat stat; 848 struct spdk_file *f = NULL; 849 850 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 851 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 852 return; 853 } 854 855 f = fs_find_file(fs, name); 856 if (f != NULL) { 857 stat.blobid = f->blobid; 858 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 859 cb_fn(cb_arg, &stat, 0); 860 return; 861 } 862 863 cb_fn(cb_arg, NULL, -ENOENT); 864 } 865 866 static void 867 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 868 { 869 struct spdk_fs_request *req = arg; 870 struct spdk_fs_cb_args *args = &req->args; 871 872 args->rc = fserrno; 873 if (fserrno == 0) { 874 memcpy(args->arg, stat, sizeof(*stat)); 875 } 876 sem_post(args->sem); 877 } 878 879 static void 880 __file_stat(void *arg) 881 { 882 struct spdk_fs_request *req = arg; 883 struct spdk_fs_cb_args *args = &req->args; 884 885 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 886 args->fn.stat_op, req); 887 } 888 889 int 890 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 891 const char *name, struct spdk_file_stat *stat) 892 { 893 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 894 struct spdk_fs_request *req; 895 int rc; 896 897 req = alloc_fs_request(channel); 898 if (req == NULL) { 899 return -ENOMEM; 900 } 901 902 req->args.fs = fs; 903 req->args.op.stat.name = name; 904 req->args.fn.stat_op = __copy_stat; 905 req->args.arg = stat; 906 req->args.sem = &channel->sem; 907 channel->send_request(__file_stat, req); 908 sem_wait(&channel->sem); 909 910 rc = req->args.rc; 911 free_fs_request(req); 912 913 return rc; 914 } 915 916 static void 917 fs_create_blob_close_cb(void *ctx, int bserrno) 918 { 919 int rc; 920 struct spdk_fs_request *req = ctx; 921 struct spdk_fs_cb_args *args = &req->args; 922 923 rc = args->rc ? args->rc : bserrno; 924 args->fn.file_op(args->arg, rc); 925 free_fs_request(req); 926 } 927 928 static void 929 fs_create_blob_resize_cb(void *ctx, int bserrno) 930 { 931 struct spdk_fs_request *req = ctx; 932 struct spdk_fs_cb_args *args = &req->args; 933 struct spdk_file *f = args->file; 934 struct spdk_blob *blob = args->op.create.blob; 935 uint64_t length = 0; 936 937 args->rc = bserrno; 938 if (bserrno) { 939 spdk_blob_close(blob, fs_create_blob_close_cb, args); 940 return; 941 } 942 943 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 944 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 945 946 spdk_blob_close(blob, fs_create_blob_close_cb, args); 947 } 948 949 static void 950 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 951 { 952 struct spdk_fs_request *req = ctx; 953 struct spdk_fs_cb_args *args = &req->args; 954 955 if (bserrno) { 956 args->fn.file_op(args->arg, bserrno); 957 free_fs_request(req); 958 return; 959 } 960 961 args->op.create.blob = blob; 962 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 963 } 964 965 static void 966 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 967 { 968 struct spdk_fs_request *req = ctx; 969 struct spdk_fs_cb_args *args = &req->args; 970 struct spdk_file *f = args->file; 971 972 if (bserrno) { 973 args->fn.file_op(args->arg, bserrno); 974 free_fs_request(req); 975 return; 976 } 977 978 f->blobid = blobid; 979 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 980 } 981 982 void 983 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 984 spdk_file_op_complete cb_fn, void *cb_arg) 985 { 986 struct spdk_file *file; 987 struct spdk_fs_request *req; 988 struct spdk_fs_cb_args *args; 989 990 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 991 cb_fn(cb_arg, -ENAMETOOLONG); 992 return; 993 } 994 995 file = fs_find_file(fs, name); 996 if (file != NULL) { 997 cb_fn(cb_arg, -EEXIST); 998 return; 999 } 1000 1001 file = file_alloc(fs); 1002 if (file == NULL) { 1003 cb_fn(cb_arg, -ENOMEM); 1004 return; 1005 } 1006 1007 req = alloc_fs_request(fs->md_target.md_fs_channel); 1008 if (req == NULL) { 1009 cb_fn(cb_arg, -ENOMEM); 1010 return; 1011 } 1012 1013 args = &req->args; 1014 args->file = file; 1015 args->fn.file_op = cb_fn; 1016 args->arg = cb_arg; 1017 1018 file->name = strdup(name); 1019 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1020 } 1021 1022 static void 1023 __fs_create_file_done(void *arg, int fserrno) 1024 { 1025 struct spdk_fs_request *req = arg; 1026 struct spdk_fs_cb_args *args = &req->args; 1027 1028 args->rc = fserrno; 1029 sem_post(args->sem); 1030 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1031 } 1032 1033 static void 1034 __fs_create_file(void *arg) 1035 { 1036 struct spdk_fs_request *req = arg; 1037 struct spdk_fs_cb_args *args = &req->args; 1038 1039 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1040 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1041 } 1042 1043 int 1044 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1045 { 1046 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1047 struct spdk_fs_request *req; 1048 struct spdk_fs_cb_args *args; 1049 int rc; 1050 1051 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1052 1053 req = alloc_fs_request(channel); 1054 if (req == NULL) { 1055 return -ENOMEM; 1056 } 1057 1058 args = &req->args; 1059 args->fs = fs; 1060 args->op.create.name = name; 1061 args->sem = &channel->sem; 1062 fs->send_request(__fs_create_file, req); 1063 sem_wait(&channel->sem); 1064 rc = args->rc; 1065 free_fs_request(req); 1066 1067 return rc; 1068 } 1069 1070 static void 1071 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1072 { 1073 struct spdk_fs_request *req = ctx; 1074 struct spdk_fs_cb_args *args = &req->args; 1075 struct spdk_file *f = args->file; 1076 1077 f->blob = blob; 1078 while (!TAILQ_EMPTY(&f->open_requests)) { 1079 req = TAILQ_FIRST(&f->open_requests); 1080 args = &req->args; 1081 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1082 args->fn.file_op_with_handle(args->arg, f, bserrno); 1083 free_fs_request(req); 1084 } 1085 } 1086 1087 static void 1088 fs_open_blob_create_cb(void *ctx, int bserrno) 1089 { 1090 struct spdk_fs_request *req = ctx; 1091 struct spdk_fs_cb_args *args = &req->args; 1092 struct spdk_file *file = args->file; 1093 struct spdk_filesystem *fs = args->fs; 1094 1095 if (file == NULL) { 1096 /* 1097 * This is from an open with CREATE flag - the file 1098 * is now created so look it up in the file list for this 1099 * filesystem. 1100 */ 1101 file = fs_find_file(fs, args->op.open.name); 1102 assert(file != NULL); 1103 args->file = file; 1104 } 1105 1106 file->ref_count++; 1107 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1108 if (file->ref_count == 1) { 1109 assert(file->blob == NULL); 1110 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1111 } else if (file->blob != NULL) { 1112 fs_open_blob_done(req, file->blob, 0); 1113 } else { 1114 /* 1115 * The blob open for this file is in progress due to a previous 1116 * open request. When that open completes, it will invoke the 1117 * open callback for this request. 1118 */ 1119 } 1120 } 1121 1122 void 1123 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1124 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1125 { 1126 struct spdk_file *f = NULL; 1127 struct spdk_fs_request *req; 1128 struct spdk_fs_cb_args *args; 1129 1130 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1131 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1132 return; 1133 } 1134 1135 f = fs_find_file(fs, name); 1136 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1137 cb_fn(cb_arg, NULL, -ENOENT); 1138 return; 1139 } 1140 1141 if (f != NULL && f->is_deleted == true) { 1142 cb_fn(cb_arg, NULL, -ENOENT); 1143 return; 1144 } 1145 1146 req = alloc_fs_request(fs->md_target.md_fs_channel); 1147 if (req == NULL) { 1148 cb_fn(cb_arg, NULL, -ENOMEM); 1149 return; 1150 } 1151 1152 args = &req->args; 1153 args->fn.file_op_with_handle = cb_fn; 1154 args->arg = cb_arg; 1155 args->file = f; 1156 args->fs = fs; 1157 args->op.open.name = name; 1158 1159 if (f == NULL) { 1160 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1161 } else { 1162 fs_open_blob_create_cb(req, 0); 1163 } 1164 } 1165 1166 static void 1167 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1168 { 1169 struct spdk_fs_request *req = arg; 1170 struct spdk_fs_cb_args *args = &req->args; 1171 1172 args->file = file; 1173 __wake_caller(args, bserrno); 1174 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1175 } 1176 1177 static void 1178 __fs_open_file(void *arg) 1179 { 1180 struct spdk_fs_request *req = arg; 1181 struct spdk_fs_cb_args *args = &req->args; 1182 1183 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1184 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1185 __fs_open_file_done, req); 1186 } 1187 1188 int 1189 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1190 const char *name, uint32_t flags, struct spdk_file **file) 1191 { 1192 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1193 struct spdk_fs_request *req; 1194 struct spdk_fs_cb_args *args; 1195 int rc; 1196 1197 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1198 1199 req = alloc_fs_request(channel); 1200 if (req == NULL) { 1201 return -ENOMEM; 1202 } 1203 1204 args = &req->args; 1205 args->fs = fs; 1206 args->op.open.name = name; 1207 args->op.open.flags = flags; 1208 args->sem = &channel->sem; 1209 fs->send_request(__fs_open_file, req); 1210 sem_wait(&channel->sem); 1211 rc = args->rc; 1212 if (rc == 0) { 1213 *file = args->file; 1214 } else { 1215 *file = NULL; 1216 } 1217 free_fs_request(req); 1218 1219 return rc; 1220 } 1221 1222 static void 1223 fs_rename_blob_close_cb(void *ctx, int bserrno) 1224 { 1225 struct spdk_fs_request *req = ctx; 1226 struct spdk_fs_cb_args *args = &req->args; 1227 1228 args->fn.fs_op(args->arg, bserrno); 1229 free_fs_request(req); 1230 } 1231 1232 static void 1233 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1234 { 1235 struct spdk_fs_request *req = ctx; 1236 struct spdk_fs_cb_args *args = &req->args; 1237 const char *new_name = args->op.rename.new_name; 1238 1239 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1240 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1241 } 1242 1243 static void 1244 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1245 { 1246 struct spdk_fs_cb_args *args = &req->args; 1247 struct spdk_file *f; 1248 1249 f = fs_find_file(args->fs, args->op.rename.old_name); 1250 if (f == NULL) { 1251 args->fn.fs_op(args->arg, -ENOENT); 1252 free_fs_request(req); 1253 return; 1254 } 1255 1256 free(f->name); 1257 f->name = strdup(args->op.rename.new_name); 1258 args->file = f; 1259 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1260 } 1261 1262 static void 1263 fs_rename_delete_done(void *arg, int fserrno) 1264 { 1265 __spdk_fs_md_rename_file(arg); 1266 } 1267 1268 void 1269 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1270 const char *old_name, const char *new_name, 1271 spdk_file_op_complete cb_fn, void *cb_arg) 1272 { 1273 struct spdk_file *f; 1274 struct spdk_fs_request *req; 1275 struct spdk_fs_cb_args *args; 1276 1277 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1278 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1279 cb_fn(cb_arg, -ENAMETOOLONG); 1280 return; 1281 } 1282 1283 req = alloc_fs_request(fs->md_target.md_fs_channel); 1284 if (req == NULL) { 1285 cb_fn(cb_arg, -ENOMEM); 1286 return; 1287 } 1288 1289 args = &req->args; 1290 args->fn.fs_op = cb_fn; 1291 args->fs = fs; 1292 args->arg = cb_arg; 1293 args->op.rename.old_name = old_name; 1294 args->op.rename.new_name = new_name; 1295 1296 f = fs_find_file(fs, new_name); 1297 if (f == NULL) { 1298 __spdk_fs_md_rename_file(req); 1299 return; 1300 } 1301 1302 /* 1303 * The rename overwrites an existing file. So delete the existing file, then 1304 * do the actual rename. 1305 */ 1306 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1307 } 1308 1309 static void 1310 __fs_rename_file_done(void *arg, int fserrno) 1311 { 1312 struct spdk_fs_request *req = arg; 1313 struct spdk_fs_cb_args *args = &req->args; 1314 1315 __wake_caller(args, fserrno); 1316 } 1317 1318 static void 1319 __fs_rename_file(void *arg) 1320 { 1321 struct spdk_fs_request *req = arg; 1322 struct spdk_fs_cb_args *args = &req->args; 1323 1324 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1325 __fs_rename_file_done, req); 1326 } 1327 1328 int 1329 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1330 const char *old_name, const char *new_name) 1331 { 1332 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1333 struct spdk_fs_request *req; 1334 struct spdk_fs_cb_args *args; 1335 int rc; 1336 1337 req = alloc_fs_request(channel); 1338 if (req == NULL) { 1339 return -ENOMEM; 1340 } 1341 1342 args = &req->args; 1343 1344 args->fs = fs; 1345 args->op.rename.old_name = old_name; 1346 args->op.rename.new_name = new_name; 1347 args->sem = &channel->sem; 1348 fs->send_request(__fs_rename_file, req); 1349 sem_wait(&channel->sem); 1350 rc = args->rc; 1351 free_fs_request(req); 1352 return rc; 1353 } 1354 1355 static void 1356 blob_delete_cb(void *ctx, int bserrno) 1357 { 1358 struct spdk_fs_request *req = ctx; 1359 struct spdk_fs_cb_args *args = &req->args; 1360 1361 args->fn.file_op(args->arg, bserrno); 1362 free_fs_request(req); 1363 } 1364 1365 void 1366 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1367 spdk_file_op_complete cb_fn, void *cb_arg) 1368 { 1369 struct spdk_file *f; 1370 spdk_blob_id blobid; 1371 struct spdk_fs_request *req; 1372 struct spdk_fs_cb_args *args; 1373 1374 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1375 1376 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1377 cb_fn(cb_arg, -ENAMETOOLONG); 1378 return; 1379 } 1380 1381 f = fs_find_file(fs, name); 1382 if (f == NULL) { 1383 cb_fn(cb_arg, -ENOENT); 1384 return; 1385 } 1386 1387 req = alloc_fs_request(fs->md_target.md_fs_channel); 1388 if (req == NULL) { 1389 cb_fn(cb_arg, -ENOMEM); 1390 return; 1391 } 1392 1393 args = &req->args; 1394 args->fn.file_op = cb_fn; 1395 args->arg = cb_arg; 1396 1397 if (f->ref_count > 0) { 1398 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1399 f->is_deleted = true; 1400 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1401 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1402 return; 1403 } 1404 1405 TAILQ_REMOVE(&fs->files, f, tailq); 1406 1407 cache_free_buffers(f); 1408 1409 blobid = f->blobid; 1410 1411 free(f->name); 1412 free(f->tree); 1413 free(f); 1414 1415 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1416 } 1417 1418 static void 1419 __fs_delete_file_done(void *arg, int fserrno) 1420 { 1421 struct spdk_fs_request *req = arg; 1422 struct spdk_fs_cb_args *args = &req->args; 1423 1424 __wake_caller(args, fserrno); 1425 } 1426 1427 static void 1428 __fs_delete_file(void *arg) 1429 { 1430 struct spdk_fs_request *req = arg; 1431 struct spdk_fs_cb_args *args = &req->args; 1432 1433 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1434 } 1435 1436 int 1437 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1438 const char *name) 1439 { 1440 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1441 struct spdk_fs_request *req; 1442 struct spdk_fs_cb_args *args; 1443 int rc; 1444 1445 req = alloc_fs_request(channel); 1446 if (req == NULL) { 1447 return -ENOMEM; 1448 } 1449 1450 args = &req->args; 1451 args->fs = fs; 1452 args->op.delete.name = name; 1453 args->sem = &channel->sem; 1454 fs->send_request(__fs_delete_file, req); 1455 sem_wait(&channel->sem); 1456 rc = args->rc; 1457 free_fs_request(req); 1458 1459 return rc; 1460 } 1461 1462 spdk_fs_iter 1463 spdk_fs_iter_first(struct spdk_filesystem *fs) 1464 { 1465 struct spdk_file *f; 1466 1467 f = TAILQ_FIRST(&fs->files); 1468 return f; 1469 } 1470 1471 spdk_fs_iter 1472 spdk_fs_iter_next(spdk_fs_iter iter) 1473 { 1474 struct spdk_file *f = iter; 1475 1476 if (f == NULL) { 1477 return NULL; 1478 } 1479 1480 f = TAILQ_NEXT(f, tailq); 1481 return f; 1482 } 1483 1484 const char * 1485 spdk_file_get_name(struct spdk_file *file) 1486 { 1487 return file->name; 1488 } 1489 1490 uint64_t 1491 spdk_file_get_length(struct spdk_file *file) 1492 { 1493 uint64_t length; 1494 1495 assert(file != NULL); 1496 1497 length = file->append_pos >= file->length ? file->append_pos : file->length; 1498 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1499 return length; 1500 } 1501 1502 static void 1503 fs_truncate_complete_cb(void *ctx, int bserrno) 1504 { 1505 struct spdk_fs_request *req = ctx; 1506 struct spdk_fs_cb_args *args = &req->args; 1507 1508 args->fn.file_op(args->arg, bserrno); 1509 free_fs_request(req); 1510 } 1511 1512 static void 1513 fs_truncate_resize_cb(void *ctx, int bserrno) 1514 { 1515 struct spdk_fs_request *req = ctx; 1516 struct spdk_fs_cb_args *args = &req->args; 1517 struct spdk_file *file = args->file; 1518 uint64_t *length = &args->op.truncate.length; 1519 1520 if (bserrno) { 1521 args->fn.file_op(args->arg, bserrno); 1522 free_fs_request(req); 1523 return; 1524 } 1525 1526 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1527 1528 file->length = *length; 1529 if (file->append_pos > file->length) { 1530 file->append_pos = file->length; 1531 } 1532 1533 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1534 } 1535 1536 static uint64_t 1537 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1538 { 1539 return (length + cluster_sz - 1) / cluster_sz; 1540 } 1541 1542 void 1543 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1544 spdk_file_op_complete cb_fn, void *cb_arg) 1545 { 1546 struct spdk_filesystem *fs; 1547 size_t num_clusters; 1548 struct spdk_fs_request *req; 1549 struct spdk_fs_cb_args *args; 1550 1551 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1552 if (length == file->length) { 1553 cb_fn(cb_arg, 0); 1554 return; 1555 } 1556 1557 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1558 if (req == NULL) { 1559 cb_fn(cb_arg, -ENOMEM); 1560 return; 1561 } 1562 1563 args = &req->args; 1564 args->fn.file_op = cb_fn; 1565 args->arg = cb_arg; 1566 args->file = file; 1567 args->op.truncate.length = length; 1568 fs = file->fs; 1569 1570 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1571 1572 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1573 } 1574 1575 static void 1576 __truncate(void *arg) 1577 { 1578 struct spdk_fs_request *req = arg; 1579 struct spdk_fs_cb_args *args = &req->args; 1580 1581 spdk_file_truncate_async(args->file, args->op.truncate.length, 1582 args->fn.file_op, args); 1583 } 1584 1585 int 1586 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1587 uint64_t length) 1588 { 1589 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1590 struct spdk_fs_request *req; 1591 struct spdk_fs_cb_args *args; 1592 int rc; 1593 1594 req = alloc_fs_request(channel); 1595 if (req == NULL) { 1596 return -ENOMEM; 1597 } 1598 1599 args = &req->args; 1600 1601 args->file = file; 1602 args->op.truncate.length = length; 1603 args->fn.file_op = __wake_caller; 1604 args->sem = &channel->sem; 1605 1606 channel->send_request(__truncate, req); 1607 sem_wait(&channel->sem); 1608 rc = args->rc; 1609 free_fs_request(req); 1610 1611 return rc; 1612 } 1613 1614 static void 1615 __rw_done(void *ctx, int bserrno) 1616 { 1617 struct spdk_fs_request *req = ctx; 1618 struct spdk_fs_cb_args *args = &req->args; 1619 1620 spdk_free(args->op.rw.pin_buf); 1621 args->fn.file_op(args->arg, bserrno); 1622 free_fs_request(req); 1623 } 1624 1625 static void 1626 __read_done(void *ctx, int bserrno) 1627 { 1628 struct spdk_fs_request *req = ctx; 1629 struct spdk_fs_cb_args *args = &req->args; 1630 1631 assert(req != NULL); 1632 if (args->op.rw.is_read) { 1633 memcpy(args->iovs[0].iov_base, 1634 args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1635 args->iovs[0].iov_len); 1636 __rw_done(req, 0); 1637 } else { 1638 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1639 args->iovs[0].iov_base, 1640 args->iovs[0].iov_len); 1641 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1642 args->op.rw.pin_buf, 1643 args->op.rw.start_lba, args->op.rw.num_lba, 1644 __rw_done, req); 1645 } 1646 } 1647 1648 static void 1649 __do_blob_read(void *ctx, int fserrno) 1650 { 1651 struct spdk_fs_request *req = ctx; 1652 struct spdk_fs_cb_args *args = &req->args; 1653 1654 if (fserrno) { 1655 __rw_done(req, fserrno); 1656 return; 1657 } 1658 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1659 args->op.rw.pin_buf, 1660 args->op.rw.start_lba, args->op.rw.num_lba, 1661 __read_done, req); 1662 } 1663 1664 static void 1665 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1666 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1667 { 1668 uint64_t end_lba; 1669 1670 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1671 *start_lba = offset / *lba_size; 1672 end_lba = (offset + length - 1) / *lba_size; 1673 *num_lba = (end_lba - *start_lba + 1); 1674 } 1675 1676 static void 1677 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1678 void *payload, uint64_t offset, uint64_t length, 1679 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1680 { 1681 struct spdk_fs_request *req; 1682 struct spdk_fs_cb_args *args; 1683 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1684 uint64_t start_lba, num_lba, pin_buf_length; 1685 uint32_t lba_size; 1686 1687 if (is_read && offset + length > file->length) { 1688 cb_fn(cb_arg, -EINVAL); 1689 return; 1690 } 1691 1692 req = alloc_fs_request_with_iov(channel, 1); 1693 if (req == NULL) { 1694 cb_fn(cb_arg, -ENOMEM); 1695 return; 1696 } 1697 1698 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1699 1700 args = &req->args; 1701 args->fn.file_op = cb_fn; 1702 args->arg = cb_arg; 1703 args->file = file; 1704 args->op.rw.channel = channel->bs_channel; 1705 args->iovs[0].iov_base = payload; 1706 args->iovs[0].iov_len = (size_t)length; 1707 args->op.rw.is_read = is_read; 1708 args->op.rw.offset = offset; 1709 args->op.rw.blocklen = lba_size; 1710 1711 pin_buf_length = num_lba * lba_size; 1712 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1713 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1714 if (args->op.rw.pin_buf == NULL) { 1715 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1716 file->name, offset, length); 1717 free_fs_request(req); 1718 cb_fn(cb_arg, -ENOMEM); 1719 return; 1720 } 1721 1722 args->op.rw.start_lba = start_lba; 1723 args->op.rw.num_lba = num_lba; 1724 1725 if (!is_read && file->length < offset + length) { 1726 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1727 } else { 1728 __do_blob_read(req, 0); 1729 } 1730 } 1731 1732 void 1733 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1734 void *payload, uint64_t offset, uint64_t length, 1735 spdk_file_op_complete cb_fn, void *cb_arg) 1736 { 1737 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1738 } 1739 1740 void 1741 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1742 void *payload, uint64_t offset, uint64_t length, 1743 spdk_file_op_complete cb_fn, void *cb_arg) 1744 { 1745 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1746 file->name, offset, length); 1747 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1748 } 1749 1750 struct spdk_io_channel * 1751 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1752 { 1753 struct spdk_io_channel *io_channel; 1754 struct spdk_fs_channel *fs_channel; 1755 1756 io_channel = spdk_get_io_channel(&fs->io_target); 1757 fs_channel = spdk_io_channel_get_ctx(io_channel); 1758 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1759 fs_channel->send_request = __send_request_direct; 1760 1761 return io_channel; 1762 } 1763 1764 void 1765 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1766 { 1767 spdk_put_io_channel(channel); 1768 } 1769 1770 struct spdk_fs_thread_ctx * 1771 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1772 { 1773 struct spdk_fs_thread_ctx *ctx; 1774 1775 ctx = calloc(1, sizeof(*ctx)); 1776 if (!ctx) { 1777 return NULL; 1778 } 1779 1780 _spdk_fs_channel_create(fs, &ctx->ch, 512); 1781 1782 ctx->ch.send_request = fs->send_request; 1783 ctx->ch.sync = 1; 1784 pthread_spin_init(&ctx->ch.lock, 0); 1785 1786 return ctx; 1787 } 1788 1789 1790 void 1791 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1792 { 1793 assert(ctx->ch.sync == 1); 1794 1795 while (true) { 1796 pthread_spin_lock(&ctx->ch.lock); 1797 if (ctx->ch.outstanding_reqs == 0) { 1798 pthread_spin_unlock(&ctx->ch.lock); 1799 break; 1800 } 1801 pthread_spin_unlock(&ctx->ch.lock); 1802 usleep(1000); 1803 } 1804 1805 _spdk_fs_channel_destroy(NULL, &ctx->ch); 1806 free(ctx); 1807 } 1808 1809 void 1810 spdk_fs_set_cache_size(uint64_t size_in_mb) 1811 { 1812 g_fs_cache_size = size_in_mb * 1024 * 1024; 1813 } 1814 1815 uint64_t 1816 spdk_fs_get_cache_size(void) 1817 { 1818 return g_fs_cache_size / (1024 * 1024); 1819 } 1820 1821 static void __file_flush(void *ctx); 1822 1823 static void * 1824 alloc_cache_memory_buffer(struct spdk_file *context) 1825 { 1826 struct spdk_file *file; 1827 void *buf; 1828 1829 buf = spdk_mempool_get(g_cache_pool); 1830 if (buf != NULL) { 1831 return buf; 1832 } 1833 1834 pthread_spin_lock(&g_caches_lock); 1835 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1836 if (!file->open_for_writing && 1837 file->priority == SPDK_FILE_PRIORITY_LOW && 1838 file != context) { 1839 break; 1840 } 1841 } 1842 pthread_spin_unlock(&g_caches_lock); 1843 if (file != NULL) { 1844 cache_free_buffers(file); 1845 buf = spdk_mempool_get(g_cache_pool); 1846 if (buf != NULL) { 1847 return buf; 1848 } 1849 } 1850 1851 pthread_spin_lock(&g_caches_lock); 1852 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1853 if (!file->open_for_writing && file != context) { 1854 break; 1855 } 1856 } 1857 pthread_spin_unlock(&g_caches_lock); 1858 if (file != NULL) { 1859 cache_free_buffers(file); 1860 buf = spdk_mempool_get(g_cache_pool); 1861 if (buf != NULL) { 1862 return buf; 1863 } 1864 } 1865 1866 pthread_spin_lock(&g_caches_lock); 1867 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1868 if (file != context) { 1869 break; 1870 } 1871 } 1872 pthread_spin_unlock(&g_caches_lock); 1873 if (file != NULL) { 1874 cache_free_buffers(file); 1875 buf = spdk_mempool_get(g_cache_pool); 1876 if (buf != NULL) { 1877 return buf; 1878 } 1879 } 1880 1881 return NULL; 1882 } 1883 1884 static struct cache_buffer * 1885 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1886 { 1887 struct cache_buffer *buf; 1888 int count = 0; 1889 1890 buf = calloc(1, sizeof(*buf)); 1891 if (buf == NULL) { 1892 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1893 return NULL; 1894 } 1895 1896 buf->buf = alloc_cache_memory_buffer(file); 1897 while (buf->buf == NULL) { 1898 /* 1899 * TODO: alloc_cache_memory_buffer() should eventually free 1900 * some buffers. Need a more sophisticated check here, instead 1901 * of just bailing if 100 tries does not result in getting a 1902 * free buffer. This will involve using the sync channel's 1903 * semaphore to block until a buffer becomes available. 1904 */ 1905 if (count++ == 100) { 1906 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 1907 file, offset); 1908 free(buf); 1909 return NULL; 1910 } 1911 buf->buf = alloc_cache_memory_buffer(file); 1912 } 1913 1914 buf->buf_size = CACHE_BUFFER_SIZE; 1915 buf->offset = offset; 1916 1917 pthread_spin_lock(&g_caches_lock); 1918 if (file->tree->present_mask == 0) { 1919 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1920 } 1921 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1922 pthread_spin_unlock(&g_caches_lock); 1923 1924 return buf; 1925 } 1926 1927 static struct cache_buffer * 1928 cache_append_buffer(struct spdk_file *file) 1929 { 1930 struct cache_buffer *last; 1931 1932 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1933 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1934 1935 last = cache_insert_buffer(file, file->append_pos); 1936 if (last == NULL) { 1937 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1938 return NULL; 1939 } 1940 1941 file->last = last; 1942 1943 return last; 1944 } 1945 1946 static void __check_sync_reqs(struct spdk_file *file); 1947 1948 static void 1949 __file_cache_finish_sync(void *ctx, int bserrno) 1950 { 1951 struct spdk_file *file = ctx; 1952 struct spdk_fs_request *sync_req; 1953 struct spdk_fs_cb_args *sync_args; 1954 1955 pthread_spin_lock(&file->lock); 1956 sync_req = TAILQ_FIRST(&file->sync_requests); 1957 sync_args = &sync_req->args; 1958 assert(sync_args->op.sync.offset <= file->length_flushed); 1959 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1960 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1961 pthread_spin_unlock(&file->lock); 1962 1963 sync_args->fn.file_op(sync_args->arg, bserrno); 1964 __check_sync_reqs(file); 1965 1966 pthread_spin_lock(&file->lock); 1967 free_fs_request(sync_req); 1968 pthread_spin_unlock(&file->lock); 1969 } 1970 1971 static void 1972 __check_sync_reqs(struct spdk_file *file) 1973 { 1974 struct spdk_fs_request *sync_req; 1975 1976 pthread_spin_lock(&file->lock); 1977 1978 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1979 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1980 break; 1981 } 1982 } 1983 1984 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1985 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1986 sync_req->args.op.sync.xattr_in_progress = true; 1987 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1988 sizeof(file->length_flushed)); 1989 1990 pthread_spin_unlock(&file->lock); 1991 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file); 1992 } else { 1993 pthread_spin_unlock(&file->lock); 1994 } 1995 } 1996 1997 static void 1998 __file_flush_done(void *ctx, int bserrno) 1999 { 2000 struct spdk_fs_request *req = ctx; 2001 struct spdk_fs_cb_args *args = &req->args; 2002 struct spdk_file *file = args->file; 2003 struct cache_buffer *next = args->op.flush.cache_buffer; 2004 2005 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2006 2007 pthread_spin_lock(&file->lock); 2008 next->in_progress = false; 2009 next->bytes_flushed += args->op.flush.length; 2010 file->length_flushed += args->op.flush.length; 2011 if (file->length_flushed > file->length) { 2012 file->length = file->length_flushed; 2013 } 2014 if (next->bytes_flushed == next->buf_size) { 2015 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2016 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2017 } 2018 2019 /* 2020 * Assert that there is no cached data that extends past the end of the underlying 2021 * blob. 2022 */ 2023 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2024 next->bytes_filled == 0); 2025 2026 pthread_spin_unlock(&file->lock); 2027 2028 __check_sync_reqs(file); 2029 2030 __file_flush(req); 2031 } 2032 2033 static void 2034 __file_flush(void *ctx) 2035 { 2036 struct spdk_fs_request *req = ctx; 2037 struct spdk_fs_cb_args *args = &req->args; 2038 struct spdk_file *file = args->file; 2039 struct cache_buffer *next; 2040 uint64_t offset, length, start_lba, num_lba; 2041 uint32_t lba_size; 2042 2043 pthread_spin_lock(&file->lock); 2044 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2045 if (next == NULL || next->in_progress) { 2046 /* 2047 * There is either no data to flush, or a flush I/O is already in 2048 * progress. So return immediately - if a flush I/O is in 2049 * progress we will flush more data after that is completed. 2050 */ 2051 free_fs_request(req); 2052 if (next == NULL) { 2053 /* 2054 * For cases where a file's cache was evicted, and then the 2055 * file was later appended, we will write the data directly 2056 * to disk and bypass cache. So just update length_flushed 2057 * here to reflect that all data was already written to disk. 2058 */ 2059 file->length_flushed = file->append_pos; 2060 } 2061 pthread_spin_unlock(&file->lock); 2062 if (next == NULL) { 2063 /* 2064 * There is no data to flush, but we still need to check for any 2065 * outstanding sync requests to make sure metadata gets updated. 2066 */ 2067 __check_sync_reqs(file); 2068 } 2069 return; 2070 } 2071 2072 offset = next->offset + next->bytes_flushed; 2073 length = next->bytes_filled - next->bytes_flushed; 2074 if (length == 0) { 2075 free_fs_request(req); 2076 pthread_spin_unlock(&file->lock); 2077 return; 2078 } 2079 args->op.flush.length = length; 2080 args->op.flush.cache_buffer = next; 2081 2082 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2083 2084 next->in_progress = true; 2085 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2086 offset, length, start_lba, num_lba); 2087 pthread_spin_unlock(&file->lock); 2088 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2089 next->buf + (start_lba * lba_size) - next->offset, 2090 start_lba, num_lba, __file_flush_done, req); 2091 } 2092 2093 static void 2094 __file_extend_done(void *arg, int bserrno) 2095 { 2096 struct spdk_fs_cb_args *args = arg; 2097 2098 __wake_caller(args, bserrno); 2099 } 2100 2101 static void 2102 __file_extend_resize_cb(void *_args, int bserrno) 2103 { 2104 struct spdk_fs_cb_args *args = _args; 2105 struct spdk_file *file = args->file; 2106 2107 if (bserrno) { 2108 __wake_caller(args, bserrno); 2109 return; 2110 } 2111 2112 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2113 } 2114 2115 static void 2116 __file_extend_blob(void *_args) 2117 { 2118 struct spdk_fs_cb_args *args = _args; 2119 struct spdk_file *file = args->file; 2120 2121 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2122 } 2123 2124 static void 2125 __rw_from_file_done(void *ctx, int bserrno) 2126 { 2127 struct spdk_fs_request *req = ctx; 2128 2129 __wake_caller(&req->args, bserrno); 2130 free_fs_request(req); 2131 } 2132 2133 static void 2134 __rw_from_file(void *ctx) 2135 { 2136 struct spdk_fs_request *req = ctx; 2137 struct spdk_fs_cb_args *args = &req->args; 2138 struct spdk_file *file = args->file; 2139 2140 if (args->op.rw.is_read) { 2141 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2142 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2143 __rw_from_file_done, req); 2144 } else { 2145 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2146 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2147 __rw_from_file_done, req); 2148 } 2149 } 2150 2151 static int 2152 __send_rw_from_file(struct spdk_file *file, void *payload, 2153 uint64_t offset, uint64_t length, bool is_read, 2154 struct spdk_fs_channel *channel) 2155 { 2156 struct spdk_fs_request *req; 2157 struct spdk_fs_cb_args *args; 2158 2159 req = alloc_fs_request_with_iov(channel, 1); 2160 if (req == NULL) { 2161 sem_post(&channel->sem); 2162 return -ENOMEM; 2163 } 2164 2165 args = &req->args; 2166 args->file = file; 2167 args->sem = &channel->sem; 2168 args->iovs[0].iov_base = payload; 2169 args->iovs[0].iov_len = (size_t)length; 2170 args->op.rw.offset = offset; 2171 args->op.rw.is_read = is_read; 2172 file->fs->send_request(__rw_from_file, req); 2173 return 0; 2174 } 2175 2176 int 2177 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2178 void *payload, uint64_t offset, uint64_t length) 2179 { 2180 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2181 struct spdk_fs_request *flush_req; 2182 uint64_t rem_length, copy, blob_size, cluster_sz; 2183 uint32_t cache_buffers_filled = 0; 2184 uint8_t *cur_payload; 2185 struct cache_buffer *last; 2186 2187 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2188 2189 if (length == 0) { 2190 return 0; 2191 } 2192 2193 if (offset != file->append_pos) { 2194 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2195 return -EINVAL; 2196 } 2197 2198 pthread_spin_lock(&file->lock); 2199 file->open_for_writing = true; 2200 2201 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2202 cache_append_buffer(file); 2203 } 2204 2205 if (file->last == NULL) { 2206 int rc; 2207 2208 file->append_pos += length; 2209 pthread_spin_unlock(&file->lock); 2210 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2211 sem_wait(&channel->sem); 2212 return rc; 2213 } 2214 2215 blob_size = __file_get_blob_size(file); 2216 2217 if ((offset + length) > blob_size) { 2218 struct spdk_fs_cb_args extend_args = {}; 2219 2220 cluster_sz = file->fs->bs_opts.cluster_sz; 2221 extend_args.sem = &channel->sem; 2222 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2223 extend_args.file = file; 2224 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2225 pthread_spin_unlock(&file->lock); 2226 file->fs->send_request(__file_extend_blob, &extend_args); 2227 sem_wait(&channel->sem); 2228 if (extend_args.rc) { 2229 return extend_args.rc; 2230 } 2231 } 2232 2233 flush_req = alloc_fs_request(channel); 2234 if (flush_req == NULL) { 2235 pthread_spin_unlock(&file->lock); 2236 return -ENOMEM; 2237 } 2238 2239 last = file->last; 2240 rem_length = length; 2241 cur_payload = payload; 2242 while (rem_length > 0) { 2243 copy = last->buf_size - last->bytes_filled; 2244 if (copy > rem_length) { 2245 copy = rem_length; 2246 } 2247 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2248 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2249 file->append_pos += copy; 2250 if (file->length < file->append_pos) { 2251 file->length = file->append_pos; 2252 } 2253 cur_payload += copy; 2254 last->bytes_filled += copy; 2255 rem_length -= copy; 2256 if (last->bytes_filled == last->buf_size) { 2257 cache_buffers_filled++; 2258 last = cache_append_buffer(file); 2259 if (last == NULL) { 2260 BLOBFS_TRACE(file, "nomem\n"); 2261 free_fs_request(flush_req); 2262 pthread_spin_unlock(&file->lock); 2263 return -ENOMEM; 2264 } 2265 } 2266 } 2267 2268 pthread_spin_unlock(&file->lock); 2269 2270 if (cache_buffers_filled == 0) { 2271 free_fs_request(flush_req); 2272 return 0; 2273 } 2274 2275 flush_req->args.file = file; 2276 file->fs->send_request(__file_flush, flush_req); 2277 return 0; 2278 } 2279 2280 static void 2281 __readahead_done(void *ctx, int bserrno) 2282 { 2283 struct spdk_fs_request *req = ctx; 2284 struct spdk_fs_cb_args *args = &req->args; 2285 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2286 struct spdk_file *file = args->file; 2287 2288 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2289 2290 pthread_spin_lock(&file->lock); 2291 cache_buffer->bytes_filled = args->op.readahead.length; 2292 cache_buffer->bytes_flushed = args->op.readahead.length; 2293 cache_buffer->in_progress = false; 2294 pthread_spin_unlock(&file->lock); 2295 2296 free_fs_request(req); 2297 } 2298 2299 static void 2300 __readahead(void *ctx) 2301 { 2302 struct spdk_fs_request *req = ctx; 2303 struct spdk_fs_cb_args *args = &req->args; 2304 struct spdk_file *file = args->file; 2305 uint64_t offset, length, start_lba, num_lba; 2306 uint32_t lba_size; 2307 2308 offset = args->op.readahead.offset; 2309 length = args->op.readahead.length; 2310 assert(length > 0); 2311 2312 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2313 2314 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2315 offset, length, start_lba, num_lba); 2316 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2317 args->op.readahead.cache_buffer->buf, 2318 start_lba, num_lba, __readahead_done, req); 2319 } 2320 2321 static uint64_t 2322 __next_cache_buffer_offset(uint64_t offset) 2323 { 2324 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2325 } 2326 2327 static void 2328 check_readahead(struct spdk_file *file, uint64_t offset, 2329 struct spdk_fs_channel *channel) 2330 { 2331 struct spdk_fs_request *req; 2332 struct spdk_fs_cb_args *args; 2333 2334 offset = __next_cache_buffer_offset(offset); 2335 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2336 return; 2337 } 2338 2339 req = alloc_fs_request(channel); 2340 if (req == NULL) { 2341 return; 2342 } 2343 args = &req->args; 2344 2345 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2346 2347 args->file = file; 2348 args->op.readahead.offset = offset; 2349 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2350 if (!args->op.readahead.cache_buffer) { 2351 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2352 free_fs_request(req); 2353 return; 2354 } 2355 2356 args->op.readahead.cache_buffer->in_progress = true; 2357 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2358 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2359 } else { 2360 args->op.readahead.length = CACHE_BUFFER_SIZE; 2361 } 2362 file->fs->send_request(__readahead, req); 2363 } 2364 2365 static int 2366 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, 2367 struct spdk_fs_channel *channel) 2368 { 2369 struct cache_buffer *buf; 2370 int rc; 2371 2372 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2373 if (buf == NULL) { 2374 pthread_spin_unlock(&file->lock); 2375 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2376 pthread_spin_lock(&file->lock); 2377 return rc; 2378 } 2379 2380 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2381 length = buf->offset + buf->bytes_filled - offset; 2382 } 2383 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2384 memcpy(payload, &buf->buf[offset - buf->offset], length); 2385 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2386 pthread_spin_lock(&g_caches_lock); 2387 spdk_tree_remove_buffer(file->tree, buf); 2388 if (file->tree->present_mask == 0) { 2389 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2390 } 2391 pthread_spin_unlock(&g_caches_lock); 2392 } 2393 2394 sem_post(&channel->sem); 2395 return 0; 2396 } 2397 2398 int64_t 2399 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2400 void *payload, uint64_t offset, uint64_t length) 2401 { 2402 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2403 uint64_t final_offset, final_length; 2404 uint32_t sub_reads = 0; 2405 int rc = 0; 2406 2407 pthread_spin_lock(&file->lock); 2408 2409 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2410 2411 file->open_for_writing = false; 2412 2413 if (length == 0 || offset >= file->append_pos) { 2414 pthread_spin_unlock(&file->lock); 2415 return 0; 2416 } 2417 2418 if (offset + length > file->append_pos) { 2419 length = file->append_pos - offset; 2420 } 2421 2422 if (offset != file->next_seq_offset) { 2423 file->seq_byte_count = 0; 2424 } 2425 file->seq_byte_count += length; 2426 file->next_seq_offset = offset + length; 2427 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2428 check_readahead(file, offset, channel); 2429 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2430 } 2431 2432 final_length = 0; 2433 final_offset = offset + length; 2434 while (offset < final_offset) { 2435 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2436 if (length > (final_offset - offset)) { 2437 length = final_offset - offset; 2438 } 2439 rc = __file_read(file, payload, offset, length, channel); 2440 if (rc == 0) { 2441 final_length += length; 2442 } else { 2443 break; 2444 } 2445 payload += length; 2446 offset += length; 2447 sub_reads++; 2448 } 2449 pthread_spin_unlock(&file->lock); 2450 while (sub_reads-- > 0) { 2451 sem_wait(&channel->sem); 2452 } 2453 if (rc == 0) { 2454 return final_length; 2455 } else { 2456 return rc; 2457 } 2458 } 2459 2460 static void 2461 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2462 spdk_file_op_complete cb_fn, void *cb_arg) 2463 { 2464 struct spdk_fs_request *sync_req; 2465 struct spdk_fs_request *flush_req; 2466 struct spdk_fs_cb_args *sync_args; 2467 struct spdk_fs_cb_args *flush_args; 2468 2469 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2470 2471 pthread_spin_lock(&file->lock); 2472 if (file->append_pos <= file->length_flushed) { 2473 BLOBFS_TRACE(file, "done - no data to flush\n"); 2474 pthread_spin_unlock(&file->lock); 2475 cb_fn(cb_arg, 0); 2476 return; 2477 } 2478 2479 sync_req = alloc_fs_request(channel); 2480 if (!sync_req) { 2481 pthread_spin_unlock(&file->lock); 2482 cb_fn(cb_arg, -ENOMEM); 2483 return; 2484 } 2485 sync_args = &sync_req->args; 2486 2487 flush_req = alloc_fs_request(channel); 2488 if (!flush_req) { 2489 pthread_spin_unlock(&file->lock); 2490 cb_fn(cb_arg, -ENOMEM); 2491 return; 2492 } 2493 flush_args = &flush_req->args; 2494 2495 sync_args->file = file; 2496 sync_args->fn.file_op = cb_fn; 2497 sync_args->arg = cb_arg; 2498 sync_args->op.sync.offset = file->append_pos; 2499 sync_args->op.sync.xattr_in_progress = false; 2500 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2501 pthread_spin_unlock(&file->lock); 2502 2503 flush_args->file = file; 2504 channel->send_request(__file_flush, flush_req); 2505 } 2506 2507 int 2508 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2509 { 2510 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2511 struct spdk_fs_cb_args args = {}; 2512 2513 args.sem = &channel->sem; 2514 _file_sync(file, channel, __wake_caller, &args); 2515 sem_wait(&channel->sem); 2516 2517 return args.rc; 2518 } 2519 2520 void 2521 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2522 spdk_file_op_complete cb_fn, void *cb_arg) 2523 { 2524 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2525 2526 _file_sync(file, channel, cb_fn, cb_arg); 2527 } 2528 2529 void 2530 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2531 { 2532 BLOBFS_TRACE(file, "priority=%u\n", priority); 2533 file->priority = priority; 2534 2535 } 2536 2537 /* 2538 * Close routines 2539 */ 2540 2541 static void 2542 __file_close_async_done(void *ctx, int bserrno) 2543 { 2544 struct spdk_fs_request *req = ctx; 2545 struct spdk_fs_cb_args *args = &req->args; 2546 struct spdk_file *file = args->file; 2547 2548 if (file->is_deleted) { 2549 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2550 return; 2551 } 2552 2553 args->fn.file_op(args->arg, bserrno); 2554 free_fs_request(req); 2555 } 2556 2557 static void 2558 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2559 { 2560 struct spdk_blob *blob; 2561 2562 pthread_spin_lock(&file->lock); 2563 if (file->ref_count == 0) { 2564 pthread_spin_unlock(&file->lock); 2565 __file_close_async_done(req, -EBADF); 2566 return; 2567 } 2568 2569 file->ref_count--; 2570 if (file->ref_count > 0) { 2571 pthread_spin_unlock(&file->lock); 2572 req->args.fn.file_op(req->args.arg, 0); 2573 free_fs_request(req); 2574 return; 2575 } 2576 2577 pthread_spin_unlock(&file->lock); 2578 2579 blob = file->blob; 2580 file->blob = NULL; 2581 spdk_blob_close(blob, __file_close_async_done, req); 2582 } 2583 2584 static void 2585 __file_close_async__sync_done(void *arg, int fserrno) 2586 { 2587 struct spdk_fs_request *req = arg; 2588 struct spdk_fs_cb_args *args = &req->args; 2589 2590 __file_close_async(args->file, req); 2591 } 2592 2593 void 2594 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2595 { 2596 struct spdk_fs_request *req; 2597 struct spdk_fs_cb_args *args; 2598 2599 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2600 if (req == NULL) { 2601 cb_fn(cb_arg, -ENOMEM); 2602 return; 2603 } 2604 2605 args = &req->args; 2606 args->file = file; 2607 args->fn.file_op = cb_fn; 2608 args->arg = cb_arg; 2609 2610 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2611 } 2612 2613 static void 2614 __file_close(void *arg) 2615 { 2616 struct spdk_fs_request *req = arg; 2617 struct spdk_fs_cb_args *args = &req->args; 2618 struct spdk_file *file = args->file; 2619 2620 __file_close_async(file, req); 2621 } 2622 2623 int 2624 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2625 { 2626 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2627 struct spdk_fs_request *req; 2628 struct spdk_fs_cb_args *args; 2629 2630 req = alloc_fs_request(channel); 2631 if (req == NULL) { 2632 return -ENOMEM; 2633 } 2634 2635 args = &req->args; 2636 2637 spdk_file_sync(file, ctx); 2638 BLOBFS_TRACE(file, "name=%s\n", file->name); 2639 args->file = file; 2640 args->sem = &channel->sem; 2641 args->fn.file_op = __wake_caller; 2642 args->arg = req; 2643 channel->send_request(__file_close, req); 2644 sem_wait(&channel->sem); 2645 2646 return args->rc; 2647 } 2648 2649 int 2650 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2651 { 2652 if (size < sizeof(spdk_blob_id)) { 2653 return -EINVAL; 2654 } 2655 2656 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2657 2658 return sizeof(spdk_blob_id); 2659 } 2660 2661 static void 2662 cache_free_buffers(struct spdk_file *file) 2663 { 2664 BLOBFS_TRACE(file, "free=%s\n", file->name); 2665 pthread_spin_lock(&file->lock); 2666 pthread_spin_lock(&g_caches_lock); 2667 if (file->tree->present_mask == 0) { 2668 pthread_spin_unlock(&g_caches_lock); 2669 pthread_spin_unlock(&file->lock); 2670 return; 2671 } 2672 spdk_tree_free_buffers(file->tree); 2673 2674 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2675 /* If not freed, put it in the end of the queue */ 2676 if (file->tree->present_mask != 0) { 2677 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2678 } 2679 file->last = NULL; 2680 pthread_spin_unlock(&g_caches_lock); 2681 pthread_spin_unlock(&file->lock); 2682 } 2683 2684 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2685 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2686