1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 void 64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 65 { 66 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 67 free(cache_buffer); 68 } 69 70 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 71 72 struct spdk_file { 73 struct spdk_filesystem *fs; 74 struct spdk_blob *blob; 75 char *name; 76 uint64_t length; 77 bool is_deleted; 78 bool open_for_writing; 79 uint64_t length_flushed; 80 uint64_t append_pos; 81 uint64_t seq_byte_count; 82 uint64_t next_seq_offset; 83 uint32_t priority; 84 TAILQ_ENTRY(spdk_file) tailq; 85 spdk_blob_id blobid; 86 uint32_t ref_count; 87 pthread_spinlock_t lock; 88 struct cache_buffer *last; 89 struct cache_tree *tree; 90 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 91 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 92 TAILQ_ENTRY(spdk_file) cache_tailq; 93 }; 94 95 struct spdk_deleted_file { 96 spdk_blob_id id; 97 TAILQ_ENTRY(spdk_deleted_file) tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 struct iovec *iovs; 138 uint32_t iovcnt; 139 struct iovec iov; 140 union { 141 struct { 142 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 143 } fs_load; 144 struct { 145 uint64_t length; 146 } truncate; 147 struct { 148 struct spdk_io_channel *channel; 149 void *pin_buf; 150 int is_read; 151 off_t offset; 152 size_t length; 153 uint64_t start_lba; 154 uint64_t num_lba; 155 uint32_t blocklen; 156 } rw; 157 struct { 158 const char *old_name; 159 const char *new_name; 160 } rename; 161 struct { 162 struct cache_buffer *cache_buffer; 163 uint64_t length; 164 } flush; 165 struct { 166 struct cache_buffer *cache_buffer; 167 uint64_t length; 168 uint64_t offset; 169 } readahead; 170 struct { 171 uint64_t offset; 172 TAILQ_ENTRY(spdk_fs_request) tailq; 173 bool xattr_in_progress; 174 } sync; 175 struct { 176 uint32_t num_clusters; 177 } resize; 178 struct { 179 const char *name; 180 uint32_t flags; 181 TAILQ_ENTRY(spdk_fs_request) tailq; 182 } open; 183 struct { 184 const char *name; 185 struct spdk_blob *blob; 186 } create; 187 struct { 188 const char *name; 189 } delete; 190 struct { 191 const char *name; 192 } stat; 193 } op; 194 }; 195 196 static void cache_free_buffers(struct spdk_file *file); 197 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs); 198 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs); 199 200 void 201 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 202 { 203 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 204 } 205 206 static void 207 __initialize_cache(void) 208 { 209 assert(g_cache_pool == NULL); 210 211 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 212 g_fs_cache_size / CACHE_BUFFER_SIZE, 213 CACHE_BUFFER_SIZE, 214 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 215 SPDK_ENV_SOCKET_ID_ANY); 216 if (!g_cache_pool) { 217 SPDK_ERRLOG("Create mempool failed, you may " 218 "increase the memory and try again\n"); 219 assert(false); 220 } 221 TAILQ_INIT(&g_caches); 222 pthread_spin_init(&g_caches_lock, 0); 223 } 224 225 static void 226 __free_cache(void) 227 { 228 assert(g_cache_pool != NULL); 229 230 spdk_mempool_free(g_cache_pool); 231 g_cache_pool = NULL; 232 } 233 234 static uint64_t 235 __file_get_blob_size(struct spdk_file *file) 236 { 237 uint64_t cluster_sz; 238 239 cluster_sz = file->fs->bs_opts.cluster_sz; 240 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 241 } 242 243 struct spdk_fs_request { 244 struct spdk_fs_cb_args args; 245 TAILQ_ENTRY(spdk_fs_request) link; 246 struct spdk_fs_channel *channel; 247 }; 248 249 struct spdk_fs_channel { 250 struct spdk_fs_request *req_mem; 251 TAILQ_HEAD(, spdk_fs_request) reqs; 252 sem_t sem; 253 struct spdk_filesystem *fs; 254 struct spdk_io_channel *bs_channel; 255 fs_send_request_fn send_request; 256 bool sync; 257 pthread_spinlock_t lock; 258 }; 259 260 /* For now, this is effectively an alias. But eventually we'll shift 261 * some data members over. */ 262 struct spdk_fs_thread_ctx { 263 struct spdk_fs_channel ch; 264 }; 265 266 static struct spdk_fs_request * 267 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 268 { 269 struct spdk_fs_request *req; 270 struct iovec *iovs = NULL; 271 272 if (iovcnt > 1) { 273 iovs = calloc(iovcnt, sizeof(struct iovec)); 274 if (!iovs) { 275 return NULL; 276 } 277 } 278 279 if (channel->sync) { 280 pthread_spin_lock(&channel->lock); 281 } 282 283 req = TAILQ_FIRST(&channel->reqs); 284 if (req) { 285 TAILQ_REMOVE(&channel->reqs, req, link); 286 } 287 288 if (channel->sync) { 289 pthread_spin_unlock(&channel->lock); 290 } 291 292 if (req == NULL) { 293 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 294 free(iovs); 295 return NULL; 296 } 297 memset(req, 0, sizeof(*req)); 298 req->channel = channel; 299 if (iovcnt > 1) { 300 req->args.iovs = iovs; 301 } else { 302 req->args.iovs = &req->args.iov; 303 } 304 req->args.iovcnt = iovcnt; 305 306 return req; 307 } 308 309 static struct spdk_fs_request * 310 alloc_fs_request(struct spdk_fs_channel *channel) 311 { 312 return alloc_fs_request_with_iov(channel, 0); 313 } 314 315 static void 316 free_fs_request(struct spdk_fs_request *req) 317 { 318 struct spdk_fs_channel *channel = req->channel; 319 320 if (req->args.iovcnt > 1) { 321 free(req->args.iovs); 322 } 323 324 if (channel->sync) { 325 pthread_spin_lock(&channel->lock); 326 } 327 328 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 329 330 if (channel->sync) { 331 pthread_spin_unlock(&channel->lock); 332 } 333 } 334 335 static int 336 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 337 uint32_t max_ops) 338 { 339 uint32_t i; 340 341 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 342 if (!channel->req_mem) { 343 return -1; 344 } 345 346 TAILQ_INIT(&channel->reqs); 347 sem_init(&channel->sem, 0, 0); 348 349 for (i = 0; i < max_ops; i++) { 350 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 351 } 352 353 channel->fs = fs; 354 355 return 0; 356 } 357 358 static int 359 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 360 { 361 struct spdk_filesystem *fs; 362 struct spdk_fs_channel *channel = ctx_buf; 363 364 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 365 366 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 367 } 368 369 static int 370 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 371 { 372 struct spdk_filesystem *fs; 373 struct spdk_fs_channel *channel = ctx_buf; 374 375 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 376 377 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 378 } 379 380 static int 381 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 382 { 383 struct spdk_filesystem *fs; 384 struct spdk_fs_channel *channel = ctx_buf; 385 386 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 387 388 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 389 } 390 391 static void 392 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 393 { 394 struct spdk_fs_channel *channel = ctx_buf; 395 396 free(channel->req_mem); 397 if (channel->bs_channel != NULL) { 398 spdk_bs_free_io_channel(channel->bs_channel); 399 } 400 } 401 402 static void 403 __send_request_direct(fs_request_fn fn, void *arg) 404 { 405 fn(arg); 406 } 407 408 static void 409 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 410 { 411 fs->bs = bs; 412 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 413 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 414 fs->md_target.md_fs_channel->send_request = __send_request_direct; 415 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 416 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 417 418 pthread_mutex_lock(&g_cache_init_lock); 419 if (g_fs_count == 0) { 420 __initialize_cache(); 421 } 422 g_fs_count++; 423 pthread_mutex_unlock(&g_cache_init_lock); 424 } 425 426 static void 427 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 428 { 429 struct spdk_fs_request *req = ctx; 430 struct spdk_fs_cb_args *args = &req->args; 431 struct spdk_filesystem *fs = args->fs; 432 433 if (bserrno == 0) { 434 common_fs_bs_init(fs, bs); 435 } else { 436 free(fs); 437 fs = NULL; 438 } 439 440 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 441 free_fs_request(req); 442 } 443 444 static void 445 fs_conf_parse(void) 446 { 447 struct spdk_conf_section *sp; 448 449 sp = spdk_conf_find_section(NULL, "Blobfs"); 450 if (sp == NULL) { 451 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 452 return; 453 } 454 455 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 456 if (g_fs_cache_buffer_shift <= 0) { 457 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 458 } 459 } 460 461 static struct spdk_filesystem * 462 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 463 { 464 struct spdk_filesystem *fs; 465 466 fs = calloc(1, sizeof(*fs)); 467 if (fs == NULL) { 468 return NULL; 469 } 470 471 fs->bdev = dev; 472 fs->send_request = send_request_fn; 473 TAILQ_INIT(&fs->files); 474 475 fs->md_target.max_ops = 512; 476 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 477 sizeof(struct spdk_fs_channel), "blobfs_md"); 478 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 479 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 480 481 fs->sync_target.max_ops = 512; 482 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 483 sizeof(struct spdk_fs_channel), "blobfs_sync"); 484 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 485 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 486 487 fs->io_target.max_ops = 512; 488 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 489 sizeof(struct spdk_fs_channel), "blobfs_io"); 490 491 return fs; 492 } 493 494 static void 495 __wake_caller(void *arg, int fserrno) 496 { 497 struct spdk_fs_cb_args *args = arg; 498 499 args->rc = fserrno; 500 sem_post(args->sem); 501 } 502 503 void 504 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 505 fs_send_request_fn send_request_fn, 506 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 507 { 508 struct spdk_filesystem *fs; 509 struct spdk_fs_request *req; 510 struct spdk_fs_cb_args *args; 511 struct spdk_bs_opts opts = {}; 512 513 fs = fs_alloc(dev, send_request_fn); 514 if (fs == NULL) { 515 cb_fn(cb_arg, NULL, -ENOMEM); 516 return; 517 } 518 519 fs_conf_parse(); 520 521 req = alloc_fs_request(fs->md_target.md_fs_channel); 522 if (req == NULL) { 523 spdk_fs_free_io_channels(fs); 524 spdk_fs_io_device_unregister(fs); 525 cb_fn(cb_arg, NULL, -ENOMEM); 526 return; 527 } 528 529 args = &req->args; 530 args->fn.fs_op_with_handle = cb_fn; 531 args->arg = cb_arg; 532 args->fs = fs; 533 534 spdk_bs_opts_init(&opts); 535 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 536 if (opt) { 537 opts.cluster_sz = opt->cluster_sz; 538 } 539 spdk_bs_init(dev, &opts, init_cb, req); 540 } 541 542 static struct spdk_file * 543 file_alloc(struct spdk_filesystem *fs) 544 { 545 struct spdk_file *file; 546 547 file = calloc(1, sizeof(*file)); 548 if (file == NULL) { 549 return NULL; 550 } 551 552 file->tree = calloc(1, sizeof(*file->tree)); 553 if (file->tree == NULL) { 554 free(file); 555 return NULL; 556 } 557 558 file->fs = fs; 559 TAILQ_INIT(&file->open_requests); 560 TAILQ_INIT(&file->sync_requests); 561 pthread_spin_init(&file->lock, 0); 562 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 563 file->priority = SPDK_FILE_PRIORITY_LOW; 564 return file; 565 } 566 567 static void fs_load_done(void *ctx, int bserrno); 568 569 static int 570 _handle_deleted_files(struct spdk_fs_request *req) 571 { 572 struct spdk_fs_cb_args *args = &req->args; 573 struct spdk_filesystem *fs = args->fs; 574 575 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 576 struct spdk_deleted_file *deleted_file; 577 578 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 579 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 580 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 581 free(deleted_file); 582 return 0; 583 } 584 585 return 1; 586 } 587 588 static void 589 fs_load_done(void *ctx, int bserrno) 590 { 591 struct spdk_fs_request *req = ctx; 592 struct spdk_fs_cb_args *args = &req->args; 593 struct spdk_filesystem *fs = args->fs; 594 595 /* The filesystem has been loaded. Now check if there are any files that 596 * were marked for deletion before last unload. Do not complete the 597 * fs_load callback until all of them have been deleted on disk. 598 */ 599 if (_handle_deleted_files(req) == 0) { 600 /* We found a file that's been marked for deleting but not actually 601 * deleted yet. This function will get called again once the delete 602 * operation is completed. 603 */ 604 return; 605 } 606 607 args->fn.fs_op_with_handle(args->arg, fs, 0); 608 free_fs_request(req); 609 610 } 611 612 static void 613 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 614 { 615 struct spdk_fs_request *req = ctx; 616 struct spdk_fs_cb_args *args = &req->args; 617 struct spdk_filesystem *fs = args->fs; 618 uint64_t *length; 619 const char *name; 620 uint32_t *is_deleted; 621 size_t value_len; 622 623 if (rc < 0) { 624 args->fn.fs_op_with_handle(args->arg, fs, rc); 625 free_fs_request(req); 626 return; 627 } 628 629 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 630 if (rc < 0) { 631 args->fn.fs_op_with_handle(args->arg, fs, rc); 632 free_fs_request(req); 633 return; 634 } 635 636 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 637 if (rc < 0) { 638 args->fn.fs_op_with_handle(args->arg, fs, rc); 639 free_fs_request(req); 640 return; 641 } 642 643 assert(value_len == 8); 644 645 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 646 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 647 if (rc < 0) { 648 struct spdk_file *f; 649 650 f = file_alloc(fs); 651 if (f == NULL) { 652 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 653 free_fs_request(req); 654 return; 655 } 656 657 f->name = strdup(name); 658 f->blobid = spdk_blob_get_id(blob); 659 f->length = *length; 660 f->length_flushed = *length; 661 f->append_pos = *length; 662 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 663 } else { 664 struct spdk_deleted_file *deleted_file; 665 666 deleted_file = calloc(1, sizeof(*deleted_file)); 667 if (deleted_file == NULL) { 668 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 669 free_fs_request(req); 670 return; 671 } 672 deleted_file->id = spdk_blob_get_id(blob); 673 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 674 } 675 } 676 677 static void 678 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 679 { 680 struct spdk_fs_request *req = ctx; 681 struct spdk_fs_cb_args *args = &req->args; 682 struct spdk_filesystem *fs = args->fs; 683 struct spdk_bs_type bstype; 684 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 685 static const struct spdk_bs_type zeros; 686 687 if (bserrno != 0) { 688 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 689 free_fs_request(req); 690 free(fs); 691 return; 692 } 693 694 bstype = spdk_bs_get_bstype(bs); 695 696 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 697 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 698 spdk_bs_set_bstype(bs, blobfs_type); 699 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 700 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 701 SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 702 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 703 free_fs_request(req); 704 free(fs); 705 return; 706 } 707 708 common_fs_bs_init(fs, bs); 709 fs_load_done(req, 0); 710 } 711 712 static void 713 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 714 { 715 assert(fs != NULL); 716 spdk_io_device_unregister(&fs->md_target, NULL); 717 spdk_io_device_unregister(&fs->sync_target, NULL); 718 spdk_io_device_unregister(&fs->io_target, NULL); 719 free(fs); 720 } 721 722 static void 723 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 724 { 725 assert(fs != NULL); 726 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 727 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 728 } 729 730 void 731 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 732 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 733 { 734 struct spdk_filesystem *fs; 735 struct spdk_fs_cb_args *args; 736 struct spdk_fs_request *req; 737 struct spdk_bs_opts bs_opts; 738 739 fs = fs_alloc(dev, send_request_fn); 740 if (fs == NULL) { 741 cb_fn(cb_arg, NULL, -ENOMEM); 742 return; 743 } 744 745 fs_conf_parse(); 746 747 req = alloc_fs_request(fs->md_target.md_fs_channel); 748 if (req == NULL) { 749 spdk_fs_free_io_channels(fs); 750 spdk_fs_io_device_unregister(fs); 751 cb_fn(cb_arg, NULL, -ENOMEM); 752 return; 753 } 754 755 args = &req->args; 756 args->fn.fs_op_with_handle = cb_fn; 757 args->arg = cb_arg; 758 args->fs = fs; 759 TAILQ_INIT(&args->op.fs_load.deleted_files); 760 spdk_bs_opts_init(&bs_opts); 761 bs_opts.iter_cb_fn = iter_cb; 762 bs_opts.iter_cb_arg = req; 763 spdk_bs_load(dev, &bs_opts, load_cb, req); 764 } 765 766 static void 767 unload_cb(void *ctx, int bserrno) 768 { 769 struct spdk_fs_request *req = ctx; 770 struct spdk_fs_cb_args *args = &req->args; 771 struct spdk_filesystem *fs = args->fs; 772 struct spdk_file *file, *tmp; 773 774 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 775 TAILQ_REMOVE(&fs->files, file, tailq); 776 cache_free_buffers(file); 777 free(file->name); 778 free(file->tree); 779 free(file); 780 } 781 782 pthread_mutex_lock(&g_cache_init_lock); 783 g_fs_count--; 784 if (g_fs_count == 0) { 785 __free_cache(); 786 } 787 pthread_mutex_unlock(&g_cache_init_lock); 788 789 args->fn.fs_op(args->arg, bserrno); 790 free(req); 791 792 spdk_fs_io_device_unregister(fs); 793 } 794 795 void 796 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 797 { 798 struct spdk_fs_request *req; 799 struct spdk_fs_cb_args *args; 800 801 /* 802 * We must free the md_channel before unloading the blobstore, so just 803 * allocate this request from the general heap. 804 */ 805 req = calloc(1, sizeof(*req)); 806 if (req == NULL) { 807 cb_fn(cb_arg, -ENOMEM); 808 return; 809 } 810 811 args = &req->args; 812 args->fn.fs_op = cb_fn; 813 args->arg = cb_arg; 814 args->fs = fs; 815 816 spdk_fs_free_io_channels(fs); 817 spdk_bs_unload(fs->bs, unload_cb, req); 818 } 819 820 static struct spdk_file * 821 fs_find_file(struct spdk_filesystem *fs, const char *name) 822 { 823 struct spdk_file *file; 824 825 TAILQ_FOREACH(file, &fs->files, tailq) { 826 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 827 return file; 828 } 829 } 830 831 return NULL; 832 } 833 834 void 835 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 836 spdk_file_stat_op_complete cb_fn, void *cb_arg) 837 { 838 struct spdk_file_stat stat; 839 struct spdk_file *f = NULL; 840 841 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 842 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 843 return; 844 } 845 846 f = fs_find_file(fs, name); 847 if (f != NULL) { 848 stat.blobid = f->blobid; 849 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 850 cb_fn(cb_arg, &stat, 0); 851 return; 852 } 853 854 cb_fn(cb_arg, NULL, -ENOENT); 855 } 856 857 static void 858 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 859 { 860 struct spdk_fs_request *req = arg; 861 struct spdk_fs_cb_args *args = &req->args; 862 863 args->rc = fserrno; 864 if (fserrno == 0) { 865 memcpy(args->arg, stat, sizeof(*stat)); 866 } 867 sem_post(args->sem); 868 } 869 870 static void 871 __file_stat(void *arg) 872 { 873 struct spdk_fs_request *req = arg; 874 struct spdk_fs_cb_args *args = &req->args; 875 876 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 877 args->fn.stat_op, req); 878 } 879 880 int 881 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 882 const char *name, struct spdk_file_stat *stat) 883 { 884 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 885 struct spdk_fs_request *req; 886 int rc; 887 888 req = alloc_fs_request(channel); 889 if (req == NULL) { 890 return -ENOMEM; 891 } 892 893 req->args.fs = fs; 894 req->args.op.stat.name = name; 895 req->args.fn.stat_op = __copy_stat; 896 req->args.arg = stat; 897 req->args.sem = &channel->sem; 898 channel->send_request(__file_stat, req); 899 sem_wait(&channel->sem); 900 901 rc = req->args.rc; 902 free_fs_request(req); 903 904 return rc; 905 } 906 907 static void 908 fs_create_blob_close_cb(void *ctx, int bserrno) 909 { 910 int rc; 911 struct spdk_fs_request *req = ctx; 912 struct spdk_fs_cb_args *args = &req->args; 913 914 rc = args->rc ? args->rc : bserrno; 915 args->fn.file_op(args->arg, rc); 916 free_fs_request(req); 917 } 918 919 static void 920 fs_create_blob_resize_cb(void *ctx, int bserrno) 921 { 922 struct spdk_fs_request *req = ctx; 923 struct spdk_fs_cb_args *args = &req->args; 924 struct spdk_file *f = args->file; 925 struct spdk_blob *blob = args->op.create.blob; 926 uint64_t length = 0; 927 928 args->rc = bserrno; 929 if (bserrno) { 930 spdk_blob_close(blob, fs_create_blob_close_cb, args); 931 return; 932 } 933 934 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 935 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 936 937 spdk_blob_close(blob, fs_create_blob_close_cb, args); 938 } 939 940 static void 941 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 942 { 943 struct spdk_fs_request *req = ctx; 944 struct spdk_fs_cb_args *args = &req->args; 945 946 if (bserrno) { 947 args->fn.file_op(args->arg, bserrno); 948 free_fs_request(req); 949 return; 950 } 951 952 args->op.create.blob = blob; 953 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 954 } 955 956 static void 957 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 958 { 959 struct spdk_fs_request *req = ctx; 960 struct spdk_fs_cb_args *args = &req->args; 961 struct spdk_file *f = args->file; 962 963 if (bserrno) { 964 args->fn.file_op(args->arg, bserrno); 965 free_fs_request(req); 966 return; 967 } 968 969 f->blobid = blobid; 970 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 971 } 972 973 void 974 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 975 spdk_file_op_complete cb_fn, void *cb_arg) 976 { 977 struct spdk_file *file; 978 struct spdk_fs_request *req; 979 struct spdk_fs_cb_args *args; 980 981 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 982 cb_fn(cb_arg, -ENAMETOOLONG); 983 return; 984 } 985 986 file = fs_find_file(fs, name); 987 if (file != NULL) { 988 cb_fn(cb_arg, -EEXIST); 989 return; 990 } 991 992 file = file_alloc(fs); 993 if (file == NULL) { 994 cb_fn(cb_arg, -ENOMEM); 995 return; 996 } 997 998 req = alloc_fs_request(fs->md_target.md_fs_channel); 999 if (req == NULL) { 1000 cb_fn(cb_arg, -ENOMEM); 1001 return; 1002 } 1003 1004 args = &req->args; 1005 args->file = file; 1006 args->fn.file_op = cb_fn; 1007 args->arg = cb_arg; 1008 1009 file->name = strdup(name); 1010 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1011 } 1012 1013 static void 1014 __fs_create_file_done(void *arg, int fserrno) 1015 { 1016 struct spdk_fs_request *req = arg; 1017 struct spdk_fs_cb_args *args = &req->args; 1018 1019 args->rc = fserrno; 1020 sem_post(args->sem); 1021 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1022 } 1023 1024 static void 1025 __fs_create_file(void *arg) 1026 { 1027 struct spdk_fs_request *req = arg; 1028 struct spdk_fs_cb_args *args = &req->args; 1029 1030 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1031 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1032 } 1033 1034 int 1035 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1036 { 1037 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1038 struct spdk_fs_request *req; 1039 struct spdk_fs_cb_args *args; 1040 int rc; 1041 1042 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1043 1044 req = alloc_fs_request(channel); 1045 if (req == NULL) { 1046 return -ENOMEM; 1047 } 1048 1049 args = &req->args; 1050 args->fs = fs; 1051 args->op.create.name = name; 1052 args->sem = &channel->sem; 1053 fs->send_request(__fs_create_file, req); 1054 sem_wait(&channel->sem); 1055 rc = args->rc; 1056 free_fs_request(req); 1057 1058 return rc; 1059 } 1060 1061 static void 1062 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1063 { 1064 struct spdk_fs_request *req = ctx; 1065 struct spdk_fs_cb_args *args = &req->args; 1066 struct spdk_file *f = args->file; 1067 1068 f->blob = blob; 1069 while (!TAILQ_EMPTY(&f->open_requests)) { 1070 req = TAILQ_FIRST(&f->open_requests); 1071 args = &req->args; 1072 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1073 args->fn.file_op_with_handle(args->arg, f, bserrno); 1074 free_fs_request(req); 1075 } 1076 } 1077 1078 static void 1079 fs_open_blob_create_cb(void *ctx, int bserrno) 1080 { 1081 struct spdk_fs_request *req = ctx; 1082 struct spdk_fs_cb_args *args = &req->args; 1083 struct spdk_file *file = args->file; 1084 struct spdk_filesystem *fs = args->fs; 1085 1086 if (file == NULL) { 1087 /* 1088 * This is from an open with CREATE flag - the file 1089 * is now created so look it up in the file list for this 1090 * filesystem. 1091 */ 1092 file = fs_find_file(fs, args->op.open.name); 1093 assert(file != NULL); 1094 args->file = file; 1095 } 1096 1097 file->ref_count++; 1098 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1099 if (file->ref_count == 1) { 1100 assert(file->blob == NULL); 1101 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1102 } else if (file->blob != NULL) { 1103 fs_open_blob_done(req, file->blob, 0); 1104 } else { 1105 /* 1106 * The blob open for this file is in progress due to a previous 1107 * open request. When that open completes, it will invoke the 1108 * open callback for this request. 1109 */ 1110 } 1111 } 1112 1113 void 1114 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1115 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1116 { 1117 struct spdk_file *f = NULL; 1118 struct spdk_fs_request *req; 1119 struct spdk_fs_cb_args *args; 1120 1121 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1122 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1123 return; 1124 } 1125 1126 f = fs_find_file(fs, name); 1127 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1128 cb_fn(cb_arg, NULL, -ENOENT); 1129 return; 1130 } 1131 1132 if (f != NULL && f->is_deleted == true) { 1133 cb_fn(cb_arg, NULL, -ENOENT); 1134 return; 1135 } 1136 1137 req = alloc_fs_request(fs->md_target.md_fs_channel); 1138 if (req == NULL) { 1139 cb_fn(cb_arg, NULL, -ENOMEM); 1140 return; 1141 } 1142 1143 args = &req->args; 1144 args->fn.file_op_with_handle = cb_fn; 1145 args->arg = cb_arg; 1146 args->file = f; 1147 args->fs = fs; 1148 args->op.open.name = name; 1149 1150 if (f == NULL) { 1151 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1152 } else { 1153 fs_open_blob_create_cb(req, 0); 1154 } 1155 } 1156 1157 static void 1158 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1159 { 1160 struct spdk_fs_request *req = arg; 1161 struct spdk_fs_cb_args *args = &req->args; 1162 1163 args->file = file; 1164 __wake_caller(args, bserrno); 1165 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1166 } 1167 1168 static void 1169 __fs_open_file(void *arg) 1170 { 1171 struct spdk_fs_request *req = arg; 1172 struct spdk_fs_cb_args *args = &req->args; 1173 1174 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1175 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1176 __fs_open_file_done, req); 1177 } 1178 1179 int 1180 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1181 const char *name, uint32_t flags, struct spdk_file **file) 1182 { 1183 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1184 struct spdk_fs_request *req; 1185 struct spdk_fs_cb_args *args; 1186 int rc; 1187 1188 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1189 1190 req = alloc_fs_request(channel); 1191 if (req == NULL) { 1192 return -ENOMEM; 1193 } 1194 1195 args = &req->args; 1196 args->fs = fs; 1197 args->op.open.name = name; 1198 args->op.open.flags = flags; 1199 args->sem = &channel->sem; 1200 fs->send_request(__fs_open_file, req); 1201 sem_wait(&channel->sem); 1202 rc = args->rc; 1203 if (rc == 0) { 1204 *file = args->file; 1205 } else { 1206 *file = NULL; 1207 } 1208 free_fs_request(req); 1209 1210 return rc; 1211 } 1212 1213 static void 1214 fs_rename_blob_close_cb(void *ctx, int bserrno) 1215 { 1216 struct spdk_fs_request *req = ctx; 1217 struct spdk_fs_cb_args *args = &req->args; 1218 1219 args->fn.fs_op(args->arg, bserrno); 1220 free_fs_request(req); 1221 } 1222 1223 static void 1224 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1225 { 1226 struct spdk_fs_request *req = ctx; 1227 struct spdk_fs_cb_args *args = &req->args; 1228 const char *new_name = args->op.rename.new_name; 1229 1230 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1231 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1232 } 1233 1234 static void 1235 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1236 { 1237 struct spdk_fs_cb_args *args = &req->args; 1238 struct spdk_file *f; 1239 1240 f = fs_find_file(args->fs, args->op.rename.old_name); 1241 if (f == NULL) { 1242 args->fn.fs_op(args->arg, -ENOENT); 1243 free_fs_request(req); 1244 return; 1245 } 1246 1247 free(f->name); 1248 f->name = strdup(args->op.rename.new_name); 1249 args->file = f; 1250 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1251 } 1252 1253 static void 1254 fs_rename_delete_done(void *arg, int fserrno) 1255 { 1256 __spdk_fs_md_rename_file(arg); 1257 } 1258 1259 void 1260 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1261 const char *old_name, const char *new_name, 1262 spdk_file_op_complete cb_fn, void *cb_arg) 1263 { 1264 struct spdk_file *f; 1265 struct spdk_fs_request *req; 1266 struct spdk_fs_cb_args *args; 1267 1268 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1269 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1270 cb_fn(cb_arg, -ENAMETOOLONG); 1271 return; 1272 } 1273 1274 req = alloc_fs_request(fs->md_target.md_fs_channel); 1275 if (req == NULL) { 1276 cb_fn(cb_arg, -ENOMEM); 1277 return; 1278 } 1279 1280 args = &req->args; 1281 args->fn.fs_op = cb_fn; 1282 args->fs = fs; 1283 args->arg = cb_arg; 1284 args->op.rename.old_name = old_name; 1285 args->op.rename.new_name = new_name; 1286 1287 f = fs_find_file(fs, new_name); 1288 if (f == NULL) { 1289 __spdk_fs_md_rename_file(req); 1290 return; 1291 } 1292 1293 /* 1294 * The rename overwrites an existing file. So delete the existing file, then 1295 * do the actual rename. 1296 */ 1297 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1298 } 1299 1300 static void 1301 __fs_rename_file_done(void *arg, int fserrno) 1302 { 1303 struct spdk_fs_request *req = arg; 1304 struct spdk_fs_cb_args *args = &req->args; 1305 1306 __wake_caller(args, fserrno); 1307 } 1308 1309 static void 1310 __fs_rename_file(void *arg) 1311 { 1312 struct spdk_fs_request *req = arg; 1313 struct spdk_fs_cb_args *args = &req->args; 1314 1315 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1316 __fs_rename_file_done, req); 1317 } 1318 1319 int 1320 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1321 const char *old_name, const char *new_name) 1322 { 1323 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1324 struct spdk_fs_request *req; 1325 struct spdk_fs_cb_args *args; 1326 int rc; 1327 1328 req = alloc_fs_request(channel); 1329 if (req == NULL) { 1330 return -ENOMEM; 1331 } 1332 1333 args = &req->args; 1334 1335 args->fs = fs; 1336 args->op.rename.old_name = old_name; 1337 args->op.rename.new_name = new_name; 1338 args->sem = &channel->sem; 1339 fs->send_request(__fs_rename_file, req); 1340 sem_wait(&channel->sem); 1341 rc = args->rc; 1342 free_fs_request(req); 1343 return rc; 1344 } 1345 1346 static void 1347 blob_delete_cb(void *ctx, int bserrno) 1348 { 1349 struct spdk_fs_request *req = ctx; 1350 struct spdk_fs_cb_args *args = &req->args; 1351 1352 args->fn.file_op(args->arg, bserrno); 1353 free_fs_request(req); 1354 } 1355 1356 void 1357 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1358 spdk_file_op_complete cb_fn, void *cb_arg) 1359 { 1360 struct spdk_file *f; 1361 spdk_blob_id blobid; 1362 struct spdk_fs_request *req; 1363 struct spdk_fs_cb_args *args; 1364 1365 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1366 1367 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1368 cb_fn(cb_arg, -ENAMETOOLONG); 1369 return; 1370 } 1371 1372 f = fs_find_file(fs, name); 1373 if (f == NULL) { 1374 cb_fn(cb_arg, -ENOENT); 1375 return; 1376 } 1377 1378 req = alloc_fs_request(fs->md_target.md_fs_channel); 1379 if (req == NULL) { 1380 cb_fn(cb_arg, -ENOMEM); 1381 return; 1382 } 1383 1384 args = &req->args; 1385 args->fn.file_op = cb_fn; 1386 args->arg = cb_arg; 1387 1388 if (f->ref_count > 0) { 1389 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1390 f->is_deleted = true; 1391 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1392 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1393 return; 1394 } 1395 1396 TAILQ_REMOVE(&fs->files, f, tailq); 1397 1398 cache_free_buffers(f); 1399 1400 blobid = f->blobid; 1401 1402 free(f->name); 1403 free(f->tree); 1404 free(f); 1405 1406 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1407 } 1408 1409 static void 1410 __fs_delete_file_done(void *arg, int fserrno) 1411 { 1412 struct spdk_fs_request *req = arg; 1413 struct spdk_fs_cb_args *args = &req->args; 1414 1415 __wake_caller(args, fserrno); 1416 } 1417 1418 static void 1419 __fs_delete_file(void *arg) 1420 { 1421 struct spdk_fs_request *req = arg; 1422 struct spdk_fs_cb_args *args = &req->args; 1423 1424 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1425 } 1426 1427 int 1428 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1429 const char *name) 1430 { 1431 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1432 struct spdk_fs_request *req; 1433 struct spdk_fs_cb_args *args; 1434 int rc; 1435 1436 req = alloc_fs_request(channel); 1437 if (req == NULL) { 1438 return -ENOMEM; 1439 } 1440 1441 args = &req->args; 1442 args->fs = fs; 1443 args->op.delete.name = name; 1444 args->sem = &channel->sem; 1445 fs->send_request(__fs_delete_file, req); 1446 sem_wait(&channel->sem); 1447 rc = args->rc; 1448 free_fs_request(req); 1449 1450 return rc; 1451 } 1452 1453 spdk_fs_iter 1454 spdk_fs_iter_first(struct spdk_filesystem *fs) 1455 { 1456 struct spdk_file *f; 1457 1458 f = TAILQ_FIRST(&fs->files); 1459 return f; 1460 } 1461 1462 spdk_fs_iter 1463 spdk_fs_iter_next(spdk_fs_iter iter) 1464 { 1465 struct spdk_file *f = iter; 1466 1467 if (f == NULL) { 1468 return NULL; 1469 } 1470 1471 f = TAILQ_NEXT(f, tailq); 1472 return f; 1473 } 1474 1475 const char * 1476 spdk_file_get_name(struct spdk_file *file) 1477 { 1478 return file->name; 1479 } 1480 1481 uint64_t 1482 spdk_file_get_length(struct spdk_file *file) 1483 { 1484 uint64_t length; 1485 1486 assert(file != NULL); 1487 1488 length = file->append_pos >= file->length ? file->append_pos : file->length; 1489 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1490 return length; 1491 } 1492 1493 static void 1494 fs_truncate_complete_cb(void *ctx, int bserrno) 1495 { 1496 struct spdk_fs_request *req = ctx; 1497 struct spdk_fs_cb_args *args = &req->args; 1498 1499 args->fn.file_op(args->arg, bserrno); 1500 free_fs_request(req); 1501 } 1502 1503 static void 1504 fs_truncate_resize_cb(void *ctx, int bserrno) 1505 { 1506 struct spdk_fs_request *req = ctx; 1507 struct spdk_fs_cb_args *args = &req->args; 1508 struct spdk_file *file = args->file; 1509 uint64_t *length = &args->op.truncate.length; 1510 1511 if (bserrno) { 1512 args->fn.file_op(args->arg, bserrno); 1513 free_fs_request(req); 1514 return; 1515 } 1516 1517 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1518 1519 file->length = *length; 1520 if (file->append_pos > file->length) { 1521 file->append_pos = file->length; 1522 } 1523 1524 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1525 } 1526 1527 static uint64_t 1528 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1529 { 1530 return (length + cluster_sz - 1) / cluster_sz; 1531 } 1532 1533 void 1534 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1535 spdk_file_op_complete cb_fn, void *cb_arg) 1536 { 1537 struct spdk_filesystem *fs; 1538 size_t num_clusters; 1539 struct spdk_fs_request *req; 1540 struct spdk_fs_cb_args *args; 1541 1542 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1543 if (length == file->length) { 1544 cb_fn(cb_arg, 0); 1545 return; 1546 } 1547 1548 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1549 if (req == NULL) { 1550 cb_fn(cb_arg, -ENOMEM); 1551 return; 1552 } 1553 1554 args = &req->args; 1555 args->fn.file_op = cb_fn; 1556 args->arg = cb_arg; 1557 args->file = file; 1558 args->op.truncate.length = length; 1559 fs = file->fs; 1560 1561 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1562 1563 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1564 } 1565 1566 static void 1567 __truncate(void *arg) 1568 { 1569 struct spdk_fs_request *req = arg; 1570 struct spdk_fs_cb_args *args = &req->args; 1571 1572 spdk_file_truncate_async(args->file, args->op.truncate.length, 1573 args->fn.file_op, args); 1574 } 1575 1576 int 1577 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1578 uint64_t length) 1579 { 1580 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1581 struct spdk_fs_request *req; 1582 struct spdk_fs_cb_args *args; 1583 int rc; 1584 1585 req = alloc_fs_request(channel); 1586 if (req == NULL) { 1587 return -ENOMEM; 1588 } 1589 1590 args = &req->args; 1591 1592 args->file = file; 1593 args->op.truncate.length = length; 1594 args->fn.file_op = __wake_caller; 1595 args->sem = &channel->sem; 1596 1597 channel->send_request(__truncate, req); 1598 sem_wait(&channel->sem); 1599 rc = args->rc; 1600 free_fs_request(req); 1601 1602 return rc; 1603 } 1604 1605 static void 1606 __rw_done(void *ctx, int bserrno) 1607 { 1608 struct spdk_fs_request *req = ctx; 1609 struct spdk_fs_cb_args *args = &req->args; 1610 1611 spdk_free(args->op.rw.pin_buf); 1612 args->fn.file_op(args->arg, bserrno); 1613 free_fs_request(req); 1614 } 1615 1616 static void 1617 __read_done(void *ctx, int bserrno) 1618 { 1619 struct spdk_fs_request *req = ctx; 1620 struct spdk_fs_cb_args *args = &req->args; 1621 1622 assert(req != NULL); 1623 if (args->op.rw.is_read) { 1624 memcpy(args->iovs[0].iov_base, 1625 args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1626 args->iovs[0].iov_len); 1627 __rw_done(req, 0); 1628 } else { 1629 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1630 args->iovs[0].iov_base, 1631 args->iovs[0].iov_len); 1632 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1633 args->op.rw.pin_buf, 1634 args->op.rw.start_lba, args->op.rw.num_lba, 1635 __rw_done, req); 1636 } 1637 } 1638 1639 static void 1640 __do_blob_read(void *ctx, int fserrno) 1641 { 1642 struct spdk_fs_request *req = ctx; 1643 struct spdk_fs_cb_args *args = &req->args; 1644 1645 if (fserrno) { 1646 __rw_done(req, fserrno); 1647 return; 1648 } 1649 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1650 args->op.rw.pin_buf, 1651 args->op.rw.start_lba, args->op.rw.num_lba, 1652 __read_done, req); 1653 } 1654 1655 static void 1656 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1657 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1658 { 1659 uint64_t end_lba; 1660 1661 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1662 *start_lba = offset / *lba_size; 1663 end_lba = (offset + length - 1) / *lba_size; 1664 *num_lba = (end_lba - *start_lba + 1); 1665 } 1666 1667 static void 1668 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1669 void *payload, uint64_t offset, uint64_t length, 1670 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1671 { 1672 struct spdk_fs_request *req; 1673 struct spdk_fs_cb_args *args; 1674 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1675 uint64_t start_lba, num_lba, pin_buf_length; 1676 uint32_t lba_size; 1677 1678 if (is_read && offset + length > file->length) { 1679 cb_fn(cb_arg, -EINVAL); 1680 return; 1681 } 1682 1683 req = alloc_fs_request_with_iov(channel, 1); 1684 if (req == NULL) { 1685 cb_fn(cb_arg, -ENOMEM); 1686 return; 1687 } 1688 1689 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1690 1691 args = &req->args; 1692 args->fn.file_op = cb_fn; 1693 args->arg = cb_arg; 1694 args->file = file; 1695 args->op.rw.channel = channel->bs_channel; 1696 args->iovs[0].iov_base = payload; 1697 args->iovs[0].iov_len = (size_t)length; 1698 args->op.rw.is_read = is_read; 1699 args->op.rw.offset = offset; 1700 args->op.rw.blocklen = lba_size; 1701 1702 pin_buf_length = num_lba * lba_size; 1703 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1704 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1705 if (args->op.rw.pin_buf == NULL) { 1706 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1707 file->name, offset, length); 1708 free_fs_request(req); 1709 cb_fn(cb_arg, -ENOMEM); 1710 return; 1711 } 1712 1713 args->op.rw.start_lba = start_lba; 1714 args->op.rw.num_lba = num_lba; 1715 1716 if (!is_read && file->length < offset + length) { 1717 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1718 } else { 1719 __do_blob_read(req, 0); 1720 } 1721 } 1722 1723 void 1724 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1725 void *payload, uint64_t offset, uint64_t length, 1726 spdk_file_op_complete cb_fn, void *cb_arg) 1727 { 1728 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1729 } 1730 1731 void 1732 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1733 void *payload, uint64_t offset, uint64_t length, 1734 spdk_file_op_complete cb_fn, void *cb_arg) 1735 { 1736 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1737 file->name, offset, length); 1738 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1739 } 1740 1741 struct spdk_io_channel * 1742 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1743 { 1744 struct spdk_io_channel *io_channel; 1745 struct spdk_fs_channel *fs_channel; 1746 1747 io_channel = spdk_get_io_channel(&fs->io_target); 1748 fs_channel = spdk_io_channel_get_ctx(io_channel); 1749 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1750 fs_channel->send_request = __send_request_direct; 1751 1752 return io_channel; 1753 } 1754 1755 void 1756 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1757 { 1758 spdk_put_io_channel(channel); 1759 } 1760 1761 struct spdk_fs_thread_ctx * 1762 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1763 { 1764 struct spdk_fs_thread_ctx *ctx; 1765 1766 ctx = calloc(1, sizeof(*ctx)); 1767 if (!ctx) { 1768 return NULL; 1769 } 1770 1771 _spdk_fs_channel_create(fs, &ctx->ch, 512); 1772 1773 ctx->ch.send_request = fs->send_request; 1774 ctx->ch.sync = 1; 1775 pthread_spin_init(&ctx->ch.lock, 0); 1776 1777 return ctx; 1778 } 1779 1780 1781 void 1782 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1783 { 1784 _spdk_fs_channel_destroy(NULL, &ctx->ch); 1785 free(ctx); 1786 } 1787 1788 void 1789 spdk_fs_set_cache_size(uint64_t size_in_mb) 1790 { 1791 g_fs_cache_size = size_in_mb * 1024 * 1024; 1792 } 1793 1794 uint64_t 1795 spdk_fs_get_cache_size(void) 1796 { 1797 return g_fs_cache_size / (1024 * 1024); 1798 } 1799 1800 static void __file_flush(void *ctx); 1801 1802 static void * 1803 alloc_cache_memory_buffer(struct spdk_file *context) 1804 { 1805 struct spdk_file *file; 1806 void *buf; 1807 1808 buf = spdk_mempool_get(g_cache_pool); 1809 if (buf != NULL) { 1810 return buf; 1811 } 1812 1813 pthread_spin_lock(&g_caches_lock); 1814 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1815 if (!file->open_for_writing && 1816 file->priority == SPDK_FILE_PRIORITY_LOW && 1817 file != context) { 1818 break; 1819 } 1820 } 1821 pthread_spin_unlock(&g_caches_lock); 1822 if (file != NULL) { 1823 cache_free_buffers(file); 1824 buf = spdk_mempool_get(g_cache_pool); 1825 if (buf != NULL) { 1826 return buf; 1827 } 1828 } 1829 1830 pthread_spin_lock(&g_caches_lock); 1831 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1832 if (!file->open_for_writing && file != context) { 1833 break; 1834 } 1835 } 1836 pthread_spin_unlock(&g_caches_lock); 1837 if (file != NULL) { 1838 cache_free_buffers(file); 1839 buf = spdk_mempool_get(g_cache_pool); 1840 if (buf != NULL) { 1841 return buf; 1842 } 1843 } 1844 1845 pthread_spin_lock(&g_caches_lock); 1846 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1847 if (file != context) { 1848 break; 1849 } 1850 } 1851 pthread_spin_unlock(&g_caches_lock); 1852 if (file != NULL) { 1853 cache_free_buffers(file); 1854 buf = spdk_mempool_get(g_cache_pool); 1855 if (buf != NULL) { 1856 return buf; 1857 } 1858 } 1859 1860 return NULL; 1861 } 1862 1863 static struct cache_buffer * 1864 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1865 { 1866 struct cache_buffer *buf; 1867 int count = 0; 1868 1869 buf = calloc(1, sizeof(*buf)); 1870 if (buf == NULL) { 1871 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1872 return NULL; 1873 } 1874 1875 buf->buf = alloc_cache_memory_buffer(file); 1876 while (buf->buf == NULL) { 1877 /* 1878 * TODO: alloc_cache_memory_buffer() should eventually free 1879 * some buffers. Need a more sophisticated check here, instead 1880 * of just bailing if 100 tries does not result in getting a 1881 * free buffer. This will involve using the sync channel's 1882 * semaphore to block until a buffer becomes available. 1883 */ 1884 if (count++ == 100) { 1885 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 1886 file, offset); 1887 free(buf); 1888 return NULL; 1889 } 1890 buf->buf = alloc_cache_memory_buffer(file); 1891 } 1892 1893 buf->buf_size = CACHE_BUFFER_SIZE; 1894 buf->offset = offset; 1895 1896 pthread_spin_lock(&g_caches_lock); 1897 if (file->tree->present_mask == 0) { 1898 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1899 } 1900 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1901 pthread_spin_unlock(&g_caches_lock); 1902 1903 return buf; 1904 } 1905 1906 static struct cache_buffer * 1907 cache_append_buffer(struct spdk_file *file) 1908 { 1909 struct cache_buffer *last; 1910 1911 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1912 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1913 1914 last = cache_insert_buffer(file, file->append_pos); 1915 if (last == NULL) { 1916 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1917 return NULL; 1918 } 1919 1920 file->last = last; 1921 1922 return last; 1923 } 1924 1925 static void __check_sync_reqs(struct spdk_file *file); 1926 1927 static void 1928 __file_cache_finish_sync(void *ctx, int bserrno) 1929 { 1930 struct spdk_file *file = ctx; 1931 struct spdk_fs_request *sync_req; 1932 struct spdk_fs_cb_args *sync_args; 1933 1934 pthread_spin_lock(&file->lock); 1935 sync_req = TAILQ_FIRST(&file->sync_requests); 1936 sync_args = &sync_req->args; 1937 assert(sync_args->op.sync.offset <= file->length_flushed); 1938 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1939 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1940 pthread_spin_unlock(&file->lock); 1941 1942 sync_args->fn.file_op(sync_args->arg, bserrno); 1943 __check_sync_reqs(file); 1944 1945 pthread_spin_lock(&file->lock); 1946 free_fs_request(sync_req); 1947 pthread_spin_unlock(&file->lock); 1948 } 1949 1950 static void 1951 __check_sync_reqs(struct spdk_file *file) 1952 { 1953 struct spdk_fs_request *sync_req; 1954 1955 pthread_spin_lock(&file->lock); 1956 1957 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1958 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1959 break; 1960 } 1961 } 1962 1963 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1964 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1965 sync_req->args.op.sync.xattr_in_progress = true; 1966 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1967 sizeof(file->length_flushed)); 1968 1969 pthread_spin_unlock(&file->lock); 1970 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file); 1971 } else { 1972 pthread_spin_unlock(&file->lock); 1973 } 1974 } 1975 1976 static void 1977 __file_flush_done(void *ctx, int bserrno) 1978 { 1979 struct spdk_fs_request *req = ctx; 1980 struct spdk_fs_cb_args *args = &req->args; 1981 struct spdk_file *file = args->file; 1982 struct cache_buffer *next = args->op.flush.cache_buffer; 1983 1984 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1985 1986 pthread_spin_lock(&file->lock); 1987 next->in_progress = false; 1988 next->bytes_flushed += args->op.flush.length; 1989 file->length_flushed += args->op.flush.length; 1990 if (file->length_flushed > file->length) { 1991 file->length = file->length_flushed; 1992 } 1993 if (next->bytes_flushed == next->buf_size) { 1994 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1995 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1996 } 1997 1998 /* 1999 * Assert that there is no cached data that extends past the end of the underlying 2000 * blob. 2001 */ 2002 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2003 next->bytes_filled == 0); 2004 2005 pthread_spin_unlock(&file->lock); 2006 2007 __check_sync_reqs(file); 2008 2009 __file_flush(req); 2010 } 2011 2012 static void 2013 __file_flush(void *ctx) 2014 { 2015 struct spdk_fs_request *req = ctx; 2016 struct spdk_fs_cb_args *args = &req->args; 2017 struct spdk_file *file = args->file; 2018 struct cache_buffer *next; 2019 uint64_t offset, length, start_lba, num_lba; 2020 uint32_t lba_size; 2021 2022 pthread_spin_lock(&file->lock); 2023 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2024 if (next == NULL || next->in_progress) { 2025 /* 2026 * There is either no data to flush, or a flush I/O is already in 2027 * progress. So return immediately - if a flush I/O is in 2028 * progress we will flush more data after that is completed. 2029 */ 2030 free_fs_request(req); 2031 if (next == NULL) { 2032 /* 2033 * For cases where a file's cache was evicted, and then the 2034 * file was later appended, we will write the data directly 2035 * to disk and bypass cache. So just update length_flushed 2036 * here to reflect that all data was already written to disk. 2037 */ 2038 file->length_flushed = file->append_pos; 2039 } 2040 pthread_spin_unlock(&file->lock); 2041 if (next == NULL) { 2042 /* 2043 * There is no data to flush, but we still need to check for any 2044 * outstanding sync requests to make sure metadata gets updated. 2045 */ 2046 __check_sync_reqs(file); 2047 } 2048 return; 2049 } 2050 2051 offset = next->offset + next->bytes_flushed; 2052 length = next->bytes_filled - next->bytes_flushed; 2053 if (length == 0) { 2054 free_fs_request(req); 2055 pthread_spin_unlock(&file->lock); 2056 return; 2057 } 2058 args->op.flush.length = length; 2059 args->op.flush.cache_buffer = next; 2060 2061 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2062 2063 next->in_progress = true; 2064 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2065 offset, length, start_lba, num_lba); 2066 pthread_spin_unlock(&file->lock); 2067 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2068 next->buf + (start_lba * lba_size) - next->offset, 2069 start_lba, num_lba, __file_flush_done, req); 2070 } 2071 2072 static void 2073 __file_extend_done(void *arg, int bserrno) 2074 { 2075 struct spdk_fs_cb_args *args = arg; 2076 2077 __wake_caller(args, bserrno); 2078 } 2079 2080 static void 2081 __file_extend_resize_cb(void *_args, int bserrno) 2082 { 2083 struct spdk_fs_cb_args *args = _args; 2084 struct spdk_file *file = args->file; 2085 2086 if (bserrno) { 2087 __wake_caller(args, bserrno); 2088 return; 2089 } 2090 2091 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2092 } 2093 2094 static void 2095 __file_extend_blob(void *_args) 2096 { 2097 struct spdk_fs_cb_args *args = _args; 2098 struct spdk_file *file = args->file; 2099 2100 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2101 } 2102 2103 static void 2104 __rw_from_file_done(void *ctx, int bserrno) 2105 { 2106 struct spdk_fs_request *req = ctx; 2107 2108 __wake_caller(&req->args, bserrno); 2109 free_fs_request(req); 2110 } 2111 2112 static void 2113 __rw_from_file(void *ctx) 2114 { 2115 struct spdk_fs_request *req = ctx; 2116 struct spdk_fs_cb_args *args = &req->args; 2117 struct spdk_file *file = args->file; 2118 2119 if (args->op.rw.is_read) { 2120 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2121 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2122 __rw_from_file_done, req); 2123 } else { 2124 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2125 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2126 __rw_from_file_done, req); 2127 } 2128 } 2129 2130 static int 2131 __send_rw_from_file(struct spdk_file *file, void *payload, 2132 uint64_t offset, uint64_t length, bool is_read, 2133 struct spdk_fs_channel *channel) 2134 { 2135 struct spdk_fs_request *req; 2136 struct spdk_fs_cb_args *args; 2137 2138 req = alloc_fs_request_with_iov(channel, 1); 2139 if (req == NULL) { 2140 sem_post(&channel->sem); 2141 return -ENOMEM; 2142 } 2143 2144 args = &req->args; 2145 args->file = file; 2146 args->sem = &channel->sem; 2147 args->iovs[0].iov_base = payload; 2148 args->iovs[0].iov_len = (size_t)length; 2149 args->op.rw.offset = offset; 2150 args->op.rw.is_read = is_read; 2151 file->fs->send_request(__rw_from_file, req); 2152 return 0; 2153 } 2154 2155 int 2156 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2157 void *payload, uint64_t offset, uint64_t length) 2158 { 2159 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2160 struct spdk_fs_request *flush_req; 2161 uint64_t rem_length, copy, blob_size, cluster_sz; 2162 uint32_t cache_buffers_filled = 0; 2163 uint8_t *cur_payload; 2164 struct cache_buffer *last; 2165 2166 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2167 2168 if (length == 0) { 2169 return 0; 2170 } 2171 2172 if (offset != file->append_pos) { 2173 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2174 return -EINVAL; 2175 } 2176 2177 pthread_spin_lock(&file->lock); 2178 file->open_for_writing = true; 2179 2180 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2181 cache_append_buffer(file); 2182 } 2183 2184 if (file->last == NULL) { 2185 int rc; 2186 2187 file->append_pos += length; 2188 pthread_spin_unlock(&file->lock); 2189 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2190 sem_wait(&channel->sem); 2191 return rc; 2192 } 2193 2194 blob_size = __file_get_blob_size(file); 2195 2196 if ((offset + length) > blob_size) { 2197 struct spdk_fs_cb_args extend_args = {}; 2198 2199 cluster_sz = file->fs->bs_opts.cluster_sz; 2200 extend_args.sem = &channel->sem; 2201 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2202 extend_args.file = file; 2203 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2204 pthread_spin_unlock(&file->lock); 2205 file->fs->send_request(__file_extend_blob, &extend_args); 2206 sem_wait(&channel->sem); 2207 if (extend_args.rc) { 2208 return extend_args.rc; 2209 } 2210 } 2211 2212 flush_req = alloc_fs_request(channel); 2213 if (flush_req == NULL) { 2214 pthread_spin_unlock(&file->lock); 2215 return -ENOMEM; 2216 } 2217 2218 last = file->last; 2219 rem_length = length; 2220 cur_payload = payload; 2221 while (rem_length > 0) { 2222 copy = last->buf_size - last->bytes_filled; 2223 if (copy > rem_length) { 2224 copy = rem_length; 2225 } 2226 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2227 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2228 file->append_pos += copy; 2229 if (file->length < file->append_pos) { 2230 file->length = file->append_pos; 2231 } 2232 cur_payload += copy; 2233 last->bytes_filled += copy; 2234 rem_length -= copy; 2235 if (last->bytes_filled == last->buf_size) { 2236 cache_buffers_filled++; 2237 last = cache_append_buffer(file); 2238 if (last == NULL) { 2239 BLOBFS_TRACE(file, "nomem\n"); 2240 free_fs_request(flush_req); 2241 pthread_spin_unlock(&file->lock); 2242 return -ENOMEM; 2243 } 2244 } 2245 } 2246 2247 pthread_spin_unlock(&file->lock); 2248 2249 if (cache_buffers_filled == 0) { 2250 free_fs_request(flush_req); 2251 return 0; 2252 } 2253 2254 flush_req->args.file = file; 2255 file->fs->send_request(__file_flush, flush_req); 2256 return 0; 2257 } 2258 2259 static void 2260 __readahead_done(void *ctx, int bserrno) 2261 { 2262 struct spdk_fs_request *req = ctx; 2263 struct spdk_fs_cb_args *args = &req->args; 2264 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2265 struct spdk_file *file = args->file; 2266 2267 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2268 2269 pthread_spin_lock(&file->lock); 2270 cache_buffer->bytes_filled = args->op.readahead.length; 2271 cache_buffer->bytes_flushed = args->op.readahead.length; 2272 cache_buffer->in_progress = false; 2273 pthread_spin_unlock(&file->lock); 2274 2275 free_fs_request(req); 2276 } 2277 2278 static void 2279 __readahead(void *ctx) 2280 { 2281 struct spdk_fs_request *req = ctx; 2282 struct spdk_fs_cb_args *args = &req->args; 2283 struct spdk_file *file = args->file; 2284 uint64_t offset, length, start_lba, num_lba; 2285 uint32_t lba_size; 2286 2287 offset = args->op.readahead.offset; 2288 length = args->op.readahead.length; 2289 assert(length > 0); 2290 2291 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2292 2293 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2294 offset, length, start_lba, num_lba); 2295 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2296 args->op.readahead.cache_buffer->buf, 2297 start_lba, num_lba, __readahead_done, req); 2298 } 2299 2300 static uint64_t 2301 __next_cache_buffer_offset(uint64_t offset) 2302 { 2303 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2304 } 2305 2306 static void 2307 check_readahead(struct spdk_file *file, uint64_t offset, 2308 struct spdk_fs_channel *channel) 2309 { 2310 struct spdk_fs_request *req; 2311 struct spdk_fs_cb_args *args; 2312 2313 offset = __next_cache_buffer_offset(offset); 2314 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2315 return; 2316 } 2317 2318 req = alloc_fs_request(channel); 2319 if (req == NULL) { 2320 return; 2321 } 2322 args = &req->args; 2323 2324 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2325 2326 args->file = file; 2327 args->op.readahead.offset = offset; 2328 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2329 if (!args->op.readahead.cache_buffer) { 2330 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2331 free_fs_request(req); 2332 return; 2333 } 2334 2335 args->op.readahead.cache_buffer->in_progress = true; 2336 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2337 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2338 } else { 2339 args->op.readahead.length = CACHE_BUFFER_SIZE; 2340 } 2341 file->fs->send_request(__readahead, req); 2342 } 2343 2344 static int 2345 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, 2346 struct spdk_fs_channel *channel) 2347 { 2348 struct cache_buffer *buf; 2349 int rc; 2350 2351 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2352 if (buf == NULL) { 2353 pthread_spin_unlock(&file->lock); 2354 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2355 pthread_spin_lock(&file->lock); 2356 return rc; 2357 } 2358 2359 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2360 length = buf->offset + buf->bytes_filled - offset; 2361 } 2362 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2363 memcpy(payload, &buf->buf[offset - buf->offset], length); 2364 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2365 pthread_spin_lock(&g_caches_lock); 2366 spdk_tree_remove_buffer(file->tree, buf); 2367 if (file->tree->present_mask == 0) { 2368 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2369 } 2370 pthread_spin_unlock(&g_caches_lock); 2371 } 2372 2373 sem_post(&channel->sem); 2374 return 0; 2375 } 2376 2377 int64_t 2378 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2379 void *payload, uint64_t offset, uint64_t length) 2380 { 2381 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2382 uint64_t final_offset, final_length; 2383 uint32_t sub_reads = 0; 2384 int rc = 0; 2385 2386 pthread_spin_lock(&file->lock); 2387 2388 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2389 2390 file->open_for_writing = false; 2391 2392 if (length == 0 || offset >= file->append_pos) { 2393 pthread_spin_unlock(&file->lock); 2394 return 0; 2395 } 2396 2397 if (offset + length > file->append_pos) { 2398 length = file->append_pos - offset; 2399 } 2400 2401 if (offset != file->next_seq_offset) { 2402 file->seq_byte_count = 0; 2403 } 2404 file->seq_byte_count += length; 2405 file->next_seq_offset = offset + length; 2406 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2407 check_readahead(file, offset, channel); 2408 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2409 } 2410 2411 final_length = 0; 2412 final_offset = offset + length; 2413 while (offset < final_offset) { 2414 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2415 if (length > (final_offset - offset)) { 2416 length = final_offset - offset; 2417 } 2418 rc = __file_read(file, payload, offset, length, channel); 2419 if (rc == 0) { 2420 final_length += length; 2421 } else { 2422 break; 2423 } 2424 payload += length; 2425 offset += length; 2426 sub_reads++; 2427 } 2428 pthread_spin_unlock(&file->lock); 2429 while (sub_reads-- > 0) { 2430 sem_wait(&channel->sem); 2431 } 2432 if (rc == 0) { 2433 return final_length; 2434 } else { 2435 return rc; 2436 } 2437 } 2438 2439 static void 2440 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2441 spdk_file_op_complete cb_fn, void *cb_arg) 2442 { 2443 struct spdk_fs_request *sync_req; 2444 struct spdk_fs_request *flush_req; 2445 struct spdk_fs_cb_args *sync_args; 2446 struct spdk_fs_cb_args *flush_args; 2447 2448 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2449 2450 pthread_spin_lock(&file->lock); 2451 if (file->append_pos <= file->length_flushed) { 2452 BLOBFS_TRACE(file, "done - no data to flush\n"); 2453 pthread_spin_unlock(&file->lock); 2454 cb_fn(cb_arg, 0); 2455 return; 2456 } 2457 2458 sync_req = alloc_fs_request(channel); 2459 if (!sync_req) { 2460 pthread_spin_unlock(&file->lock); 2461 cb_fn(cb_arg, -ENOMEM); 2462 return; 2463 } 2464 sync_args = &sync_req->args; 2465 2466 flush_req = alloc_fs_request(channel); 2467 if (!flush_req) { 2468 pthread_spin_unlock(&file->lock); 2469 cb_fn(cb_arg, -ENOMEM); 2470 return; 2471 } 2472 flush_args = &flush_req->args; 2473 2474 sync_args->file = file; 2475 sync_args->fn.file_op = cb_fn; 2476 sync_args->arg = cb_arg; 2477 sync_args->op.sync.offset = file->append_pos; 2478 sync_args->op.sync.xattr_in_progress = false; 2479 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2480 pthread_spin_unlock(&file->lock); 2481 2482 flush_args->file = file; 2483 channel->send_request(__file_flush, flush_req); 2484 } 2485 2486 int 2487 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2488 { 2489 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2490 struct spdk_fs_cb_args args = {}; 2491 2492 args.sem = &channel->sem; 2493 _file_sync(file, channel, __wake_caller, &args); 2494 sem_wait(&channel->sem); 2495 2496 return args.rc; 2497 } 2498 2499 void 2500 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2501 spdk_file_op_complete cb_fn, void *cb_arg) 2502 { 2503 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2504 2505 _file_sync(file, channel, cb_fn, cb_arg); 2506 } 2507 2508 void 2509 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2510 { 2511 BLOBFS_TRACE(file, "priority=%u\n", priority); 2512 file->priority = priority; 2513 2514 } 2515 2516 /* 2517 * Close routines 2518 */ 2519 2520 static void 2521 __file_close_async_done(void *ctx, int bserrno) 2522 { 2523 struct spdk_fs_request *req = ctx; 2524 struct spdk_fs_cb_args *args = &req->args; 2525 struct spdk_file *file = args->file; 2526 2527 if (file->is_deleted) { 2528 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2529 return; 2530 } 2531 2532 args->fn.file_op(args->arg, bserrno); 2533 free_fs_request(req); 2534 } 2535 2536 static void 2537 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2538 { 2539 struct spdk_blob *blob; 2540 2541 pthread_spin_lock(&file->lock); 2542 if (file->ref_count == 0) { 2543 pthread_spin_unlock(&file->lock); 2544 __file_close_async_done(req, -EBADF); 2545 return; 2546 } 2547 2548 file->ref_count--; 2549 if (file->ref_count > 0) { 2550 pthread_spin_unlock(&file->lock); 2551 req->args.fn.file_op(req->args.arg, 0); 2552 free_fs_request(req); 2553 return; 2554 } 2555 2556 pthread_spin_unlock(&file->lock); 2557 2558 blob = file->blob; 2559 file->blob = NULL; 2560 spdk_blob_close(blob, __file_close_async_done, req); 2561 } 2562 2563 static void 2564 __file_close_async__sync_done(void *arg, int fserrno) 2565 { 2566 struct spdk_fs_request *req = arg; 2567 struct spdk_fs_cb_args *args = &req->args; 2568 2569 __file_close_async(args->file, req); 2570 } 2571 2572 void 2573 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2574 { 2575 struct spdk_fs_request *req; 2576 struct spdk_fs_cb_args *args; 2577 2578 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2579 if (req == NULL) { 2580 cb_fn(cb_arg, -ENOMEM); 2581 return; 2582 } 2583 2584 args = &req->args; 2585 args->file = file; 2586 args->fn.file_op = cb_fn; 2587 args->arg = cb_arg; 2588 2589 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2590 } 2591 2592 static void 2593 __file_close(void *arg) 2594 { 2595 struct spdk_fs_request *req = arg; 2596 struct spdk_fs_cb_args *args = &req->args; 2597 struct spdk_file *file = args->file; 2598 2599 __file_close_async(file, req); 2600 } 2601 2602 int 2603 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2604 { 2605 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2606 struct spdk_fs_request *req; 2607 struct spdk_fs_cb_args *args; 2608 2609 req = alloc_fs_request(channel); 2610 if (req == NULL) { 2611 return -ENOMEM; 2612 } 2613 2614 args = &req->args; 2615 2616 spdk_file_sync(file, ctx); 2617 BLOBFS_TRACE(file, "name=%s\n", file->name); 2618 args->file = file; 2619 args->sem = &channel->sem; 2620 args->fn.file_op = __wake_caller; 2621 args->arg = req; 2622 channel->send_request(__file_close, req); 2623 sem_wait(&channel->sem); 2624 2625 return args->rc; 2626 } 2627 2628 int 2629 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2630 { 2631 if (size < sizeof(spdk_blob_id)) { 2632 return -EINVAL; 2633 } 2634 2635 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2636 2637 return sizeof(spdk_blob_id); 2638 } 2639 2640 static void 2641 cache_free_buffers(struct spdk_file *file) 2642 { 2643 BLOBFS_TRACE(file, "free=%s\n", file->name); 2644 pthread_spin_lock(&file->lock); 2645 pthread_spin_lock(&g_caches_lock); 2646 if (file->tree->present_mask == 0) { 2647 pthread_spin_unlock(&g_caches_lock); 2648 pthread_spin_unlock(&file->lock); 2649 return; 2650 } 2651 spdk_tree_free_buffers(file->tree); 2652 2653 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2654 /* If not freed, put it in the end of the queue */ 2655 if (file->tree->present_mask != 0) { 2656 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2657 } 2658 file->last = NULL; 2659 pthread_spin_unlock(&g_caches_lock); 2660 pthread_spin_unlock(&file->lock); 2661 } 2662 2663 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2664 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2665