1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "tree.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 #include "spdk/trace.h" 47 48 #define BLOBFS_TRACE(file, str, args...) \ 49 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 50 51 #define BLOBFS_TRACE_RW(file, str, args...) \ 52 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 53 54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 56 57 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 58 59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 60 static struct spdk_mempool *g_cache_pool; 61 static TAILQ_HEAD(, spdk_file) g_caches; 62 static int g_fs_count = 0; 63 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 64 static pthread_spinlock_t g_caches_lock; 65 66 #define TRACE_GROUP_BLOBFS 0x7 67 #define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0) 68 #define TRACE_BLOBFS_XATTR_END SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1) 69 #define TRACE_BLOBFS_OPEN SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2) 70 #define TRACE_BLOBFS_CLOSE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3) 71 #define TRACE_BLOBFS_DELETE_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4) 72 #define TRACE_BLOBFS_DELETE_DONE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5) 73 74 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 75 { 76 spdk_trace_register_description("BLOBFS_XATTR_START", 77 TRACE_BLOBFS_XATTR_START, 78 OWNER_NONE, OBJECT_NONE, 0, 79 SPDK_TRACE_ARG_TYPE_STR, 80 "file: "); 81 spdk_trace_register_description("BLOBFS_XATTR_END", 82 TRACE_BLOBFS_XATTR_END, 83 OWNER_NONE, OBJECT_NONE, 0, 84 SPDK_TRACE_ARG_TYPE_STR, 85 "file: "); 86 spdk_trace_register_description("BLOBFS_OPEN", 87 TRACE_BLOBFS_OPEN, 88 OWNER_NONE, OBJECT_NONE, 0, 89 SPDK_TRACE_ARG_TYPE_STR, 90 "file: "); 91 spdk_trace_register_description("BLOBFS_CLOSE", 92 TRACE_BLOBFS_CLOSE, 93 OWNER_NONE, OBJECT_NONE, 0, 94 SPDK_TRACE_ARG_TYPE_STR, 95 "file: "); 96 spdk_trace_register_description("BLOBFS_DELETE_START", 97 TRACE_BLOBFS_DELETE_START, 98 OWNER_NONE, OBJECT_NONE, 0, 99 SPDK_TRACE_ARG_TYPE_STR, 100 "file: "); 101 spdk_trace_register_description("BLOBFS_DELETE_DONE", 102 TRACE_BLOBFS_DELETE_DONE, 103 OWNER_NONE, OBJECT_NONE, 0, 104 SPDK_TRACE_ARG_TYPE_STR, 105 "file: "); 106 } 107 108 void 109 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 110 { 111 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 112 free(cache_buffer); 113 } 114 115 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 116 117 struct spdk_file { 118 struct spdk_filesystem *fs; 119 struct spdk_blob *blob; 120 char *name; 121 uint64_t trace_arg_name; 122 uint64_t length; 123 bool is_deleted; 124 bool open_for_writing; 125 uint64_t length_flushed; 126 uint64_t length_xattr; 127 uint64_t append_pos; 128 uint64_t seq_byte_count; 129 uint64_t next_seq_offset; 130 uint32_t priority; 131 TAILQ_ENTRY(spdk_file) tailq; 132 spdk_blob_id blobid; 133 uint32_t ref_count; 134 pthread_spinlock_t lock; 135 struct cache_buffer *last; 136 struct cache_tree *tree; 137 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 138 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 139 TAILQ_ENTRY(spdk_file) cache_tailq; 140 }; 141 142 struct spdk_deleted_file { 143 spdk_blob_id id; 144 TAILQ_ENTRY(spdk_deleted_file) tailq; 145 }; 146 147 struct spdk_filesystem { 148 struct spdk_blob_store *bs; 149 TAILQ_HEAD(, spdk_file) files; 150 struct spdk_bs_opts bs_opts; 151 struct spdk_bs_dev *bdev; 152 fs_send_request_fn send_request; 153 154 struct { 155 uint32_t max_ops; 156 struct spdk_io_channel *sync_io_channel; 157 struct spdk_fs_channel *sync_fs_channel; 158 } sync_target; 159 160 struct { 161 uint32_t max_ops; 162 struct spdk_io_channel *md_io_channel; 163 struct spdk_fs_channel *md_fs_channel; 164 } md_target; 165 166 struct { 167 uint32_t max_ops; 168 } io_target; 169 }; 170 171 struct spdk_fs_cb_args { 172 union { 173 spdk_fs_op_with_handle_complete fs_op_with_handle; 174 spdk_fs_op_complete fs_op; 175 spdk_file_op_with_handle_complete file_op_with_handle; 176 spdk_file_op_complete file_op; 177 spdk_file_stat_op_complete stat_op; 178 } fn; 179 void *arg; 180 sem_t *sem; 181 struct spdk_filesystem *fs; 182 struct spdk_file *file; 183 int rc; 184 struct iovec *iovs; 185 uint32_t iovcnt; 186 struct iovec iov; 187 union { 188 struct { 189 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 190 } fs_load; 191 struct { 192 uint64_t length; 193 } truncate; 194 struct { 195 struct spdk_io_channel *channel; 196 void *pin_buf; 197 int is_read; 198 off_t offset; 199 size_t length; 200 uint64_t start_lba; 201 uint64_t num_lba; 202 uint32_t blocklen; 203 } rw; 204 struct { 205 const char *old_name; 206 const char *new_name; 207 } rename; 208 struct { 209 struct cache_buffer *cache_buffer; 210 uint64_t length; 211 } flush; 212 struct { 213 struct cache_buffer *cache_buffer; 214 uint64_t length; 215 uint64_t offset; 216 } readahead; 217 struct { 218 /* offset of the file when the sync request was made */ 219 uint64_t offset; 220 TAILQ_ENTRY(spdk_fs_request) tailq; 221 bool xattr_in_progress; 222 /* length written to the xattr for this file - this should 223 * always be the same as the offset if only one thread is 224 * writing to the file, but could differ if multiple threads 225 * are appending 226 */ 227 uint64_t length; 228 } sync; 229 struct { 230 uint32_t num_clusters; 231 } resize; 232 struct { 233 const char *name; 234 uint32_t flags; 235 TAILQ_ENTRY(spdk_fs_request) tailq; 236 } open; 237 struct { 238 const char *name; 239 struct spdk_blob *blob; 240 } create; 241 struct { 242 const char *name; 243 } delete; 244 struct { 245 const char *name; 246 } stat; 247 } op; 248 }; 249 250 static void cache_free_buffers(struct spdk_file *file); 251 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs); 252 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs); 253 254 void 255 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 256 { 257 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 258 } 259 260 static void 261 __initialize_cache(void) 262 { 263 assert(g_cache_pool == NULL); 264 265 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 266 g_fs_cache_size / CACHE_BUFFER_SIZE, 267 CACHE_BUFFER_SIZE, 268 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 269 SPDK_ENV_SOCKET_ID_ANY); 270 if (!g_cache_pool) { 271 SPDK_ERRLOG("Create mempool failed, you may " 272 "increase the memory and try again\n"); 273 assert(false); 274 } 275 TAILQ_INIT(&g_caches); 276 pthread_spin_init(&g_caches_lock, 0); 277 } 278 279 static void 280 __free_cache(void) 281 { 282 assert(g_cache_pool != NULL); 283 284 spdk_mempool_free(g_cache_pool); 285 g_cache_pool = NULL; 286 } 287 288 static uint64_t 289 __file_get_blob_size(struct spdk_file *file) 290 { 291 uint64_t cluster_sz; 292 293 cluster_sz = file->fs->bs_opts.cluster_sz; 294 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 295 } 296 297 struct spdk_fs_request { 298 struct spdk_fs_cb_args args; 299 TAILQ_ENTRY(spdk_fs_request) link; 300 struct spdk_fs_channel *channel; 301 }; 302 303 struct spdk_fs_channel { 304 struct spdk_fs_request *req_mem; 305 TAILQ_HEAD(, spdk_fs_request) reqs; 306 sem_t sem; 307 struct spdk_filesystem *fs; 308 struct spdk_io_channel *bs_channel; 309 fs_send_request_fn send_request; 310 bool sync; 311 uint32_t outstanding_reqs; 312 pthread_spinlock_t lock; 313 }; 314 315 /* For now, this is effectively an alias. But eventually we'll shift 316 * some data members over. */ 317 struct spdk_fs_thread_ctx { 318 struct spdk_fs_channel ch; 319 }; 320 321 static struct spdk_fs_request * 322 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 323 { 324 struct spdk_fs_request *req; 325 struct iovec *iovs = NULL; 326 327 if (iovcnt > 1) { 328 iovs = calloc(iovcnt, sizeof(struct iovec)); 329 if (!iovs) { 330 return NULL; 331 } 332 } 333 334 if (channel->sync) { 335 pthread_spin_lock(&channel->lock); 336 } 337 338 req = TAILQ_FIRST(&channel->reqs); 339 if (req) { 340 channel->outstanding_reqs++; 341 TAILQ_REMOVE(&channel->reqs, req, link); 342 } 343 344 if (channel->sync) { 345 pthread_spin_unlock(&channel->lock); 346 } 347 348 if (req == NULL) { 349 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 350 free(iovs); 351 return NULL; 352 } 353 memset(req, 0, sizeof(*req)); 354 req->channel = channel; 355 if (iovcnt > 1) { 356 req->args.iovs = iovs; 357 } else { 358 req->args.iovs = &req->args.iov; 359 } 360 req->args.iovcnt = iovcnt; 361 362 return req; 363 } 364 365 static struct spdk_fs_request * 366 alloc_fs_request(struct spdk_fs_channel *channel) 367 { 368 return alloc_fs_request_with_iov(channel, 0); 369 } 370 371 static void 372 free_fs_request(struct spdk_fs_request *req) 373 { 374 struct spdk_fs_channel *channel = req->channel; 375 376 if (req->args.iovcnt > 1) { 377 free(req->args.iovs); 378 } 379 380 if (channel->sync) { 381 pthread_spin_lock(&channel->lock); 382 } 383 384 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 385 channel->outstanding_reqs--; 386 387 if (channel->sync) { 388 pthread_spin_unlock(&channel->lock); 389 } 390 } 391 392 static int 393 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 394 uint32_t max_ops) 395 { 396 uint32_t i; 397 398 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 399 if (!channel->req_mem) { 400 return -1; 401 } 402 403 channel->outstanding_reqs = 0; 404 TAILQ_INIT(&channel->reqs); 405 sem_init(&channel->sem, 0, 0); 406 407 for (i = 0; i < max_ops; i++) { 408 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 409 } 410 411 channel->fs = fs; 412 413 return 0; 414 } 415 416 static int 417 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 418 { 419 struct spdk_filesystem *fs; 420 struct spdk_fs_channel *channel = ctx_buf; 421 422 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 423 424 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 425 } 426 427 static int 428 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 429 { 430 struct spdk_filesystem *fs; 431 struct spdk_fs_channel *channel = ctx_buf; 432 433 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 434 435 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 436 } 437 438 static int 439 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 440 { 441 struct spdk_filesystem *fs; 442 struct spdk_fs_channel *channel = ctx_buf; 443 444 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 445 446 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 447 } 448 449 static void 450 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 451 { 452 struct spdk_fs_channel *channel = ctx_buf; 453 454 if (channel->outstanding_reqs > 0) { 455 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 456 channel->outstanding_reqs); 457 } 458 459 free(channel->req_mem); 460 if (channel->bs_channel != NULL) { 461 spdk_bs_free_io_channel(channel->bs_channel); 462 } 463 } 464 465 static void 466 __send_request_direct(fs_request_fn fn, void *arg) 467 { 468 fn(arg); 469 } 470 471 static void 472 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 473 { 474 fs->bs = bs; 475 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 476 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 477 fs->md_target.md_fs_channel->send_request = __send_request_direct; 478 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 479 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 480 481 pthread_mutex_lock(&g_cache_init_lock); 482 if (g_fs_count == 0) { 483 __initialize_cache(); 484 } 485 g_fs_count++; 486 pthread_mutex_unlock(&g_cache_init_lock); 487 } 488 489 static void 490 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 491 { 492 struct spdk_fs_request *req = ctx; 493 struct spdk_fs_cb_args *args = &req->args; 494 struct spdk_filesystem *fs = args->fs; 495 496 if (bserrno == 0) { 497 common_fs_bs_init(fs, bs); 498 } else { 499 free(fs); 500 fs = NULL; 501 } 502 503 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 504 free_fs_request(req); 505 } 506 507 static void 508 fs_conf_parse(void) 509 { 510 struct spdk_conf_section *sp; 511 512 sp = spdk_conf_find_section(NULL, "Blobfs"); 513 if (sp == NULL) { 514 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 515 return; 516 } 517 518 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 519 if (g_fs_cache_buffer_shift <= 0) { 520 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 521 } 522 } 523 524 static struct spdk_filesystem * 525 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 526 { 527 struct spdk_filesystem *fs; 528 529 fs = calloc(1, sizeof(*fs)); 530 if (fs == NULL) { 531 return NULL; 532 } 533 534 fs->bdev = dev; 535 fs->send_request = send_request_fn; 536 TAILQ_INIT(&fs->files); 537 538 fs->md_target.max_ops = 512; 539 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 540 sizeof(struct spdk_fs_channel), "blobfs_md"); 541 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 542 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 543 544 fs->sync_target.max_ops = 512; 545 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 546 sizeof(struct spdk_fs_channel), "blobfs_sync"); 547 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 548 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 549 550 fs->io_target.max_ops = 512; 551 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 552 sizeof(struct spdk_fs_channel), "blobfs_io"); 553 554 return fs; 555 } 556 557 static void 558 __wake_caller(void *arg, int fserrno) 559 { 560 struct spdk_fs_cb_args *args = arg; 561 562 args->rc = fserrno; 563 sem_post(args->sem); 564 } 565 566 void 567 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 568 fs_send_request_fn send_request_fn, 569 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 570 { 571 struct spdk_filesystem *fs; 572 struct spdk_fs_request *req; 573 struct spdk_fs_cb_args *args; 574 struct spdk_bs_opts opts = {}; 575 576 fs = fs_alloc(dev, send_request_fn); 577 if (fs == NULL) { 578 cb_fn(cb_arg, NULL, -ENOMEM); 579 return; 580 } 581 582 fs_conf_parse(); 583 584 req = alloc_fs_request(fs->md_target.md_fs_channel); 585 if (req == NULL) { 586 spdk_fs_free_io_channels(fs); 587 spdk_fs_io_device_unregister(fs); 588 cb_fn(cb_arg, NULL, -ENOMEM); 589 return; 590 } 591 592 args = &req->args; 593 args->fn.fs_op_with_handle = cb_fn; 594 args->arg = cb_arg; 595 args->fs = fs; 596 597 spdk_bs_opts_init(&opts); 598 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 599 if (opt) { 600 opts.cluster_sz = opt->cluster_sz; 601 } 602 spdk_bs_init(dev, &opts, init_cb, req); 603 } 604 605 static struct spdk_file * 606 file_alloc(struct spdk_filesystem *fs) 607 { 608 struct spdk_file *file; 609 610 file = calloc(1, sizeof(*file)); 611 if (file == NULL) { 612 return NULL; 613 } 614 615 file->tree = calloc(1, sizeof(*file->tree)); 616 if (file->tree == NULL) { 617 free(file); 618 return NULL; 619 } 620 621 file->fs = fs; 622 TAILQ_INIT(&file->open_requests); 623 TAILQ_INIT(&file->sync_requests); 624 pthread_spin_init(&file->lock, 0); 625 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 626 file->priority = SPDK_FILE_PRIORITY_LOW; 627 return file; 628 } 629 630 static void fs_load_done(void *ctx, int bserrno); 631 632 static int 633 _handle_deleted_files(struct spdk_fs_request *req) 634 { 635 struct spdk_fs_cb_args *args = &req->args; 636 struct spdk_filesystem *fs = args->fs; 637 638 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 639 struct spdk_deleted_file *deleted_file; 640 641 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 642 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 643 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 644 free(deleted_file); 645 return 0; 646 } 647 648 return 1; 649 } 650 651 static void 652 fs_load_done(void *ctx, int bserrno) 653 { 654 struct spdk_fs_request *req = ctx; 655 struct spdk_fs_cb_args *args = &req->args; 656 struct spdk_filesystem *fs = args->fs; 657 658 /* The filesystem has been loaded. Now check if there are any files that 659 * were marked for deletion before last unload. Do not complete the 660 * fs_load callback until all of them have been deleted on disk. 661 */ 662 if (_handle_deleted_files(req) == 0) { 663 /* We found a file that's been marked for deleting but not actually 664 * deleted yet. This function will get called again once the delete 665 * operation is completed. 666 */ 667 return; 668 } 669 670 args->fn.fs_op_with_handle(args->arg, fs, 0); 671 free_fs_request(req); 672 673 } 674 675 static void 676 _file_build_trace_arg_name(struct spdk_file *f) 677 { 678 f->trace_arg_name = 0; 679 memcpy(&f->trace_arg_name, f->name, 680 spdk_min(sizeof(f->trace_arg_name), strlen(f->name))); 681 } 682 683 static void 684 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 685 { 686 struct spdk_fs_request *req = ctx; 687 struct spdk_fs_cb_args *args = &req->args; 688 struct spdk_filesystem *fs = args->fs; 689 uint64_t *length; 690 const char *name; 691 uint32_t *is_deleted; 692 size_t value_len; 693 694 if (rc < 0) { 695 args->fn.fs_op_with_handle(args->arg, fs, rc); 696 free_fs_request(req); 697 return; 698 } 699 700 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 701 if (rc < 0) { 702 args->fn.fs_op_with_handle(args->arg, fs, rc); 703 free_fs_request(req); 704 return; 705 } 706 707 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 708 if (rc < 0) { 709 args->fn.fs_op_with_handle(args->arg, fs, rc); 710 free_fs_request(req); 711 return; 712 } 713 714 assert(value_len == 8); 715 716 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 717 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 718 if (rc < 0) { 719 struct spdk_file *f; 720 721 f = file_alloc(fs); 722 if (f == NULL) { 723 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 724 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 725 free_fs_request(req); 726 return; 727 } 728 729 f->name = strdup(name); 730 _file_build_trace_arg_name(f); 731 f->blobid = spdk_blob_get_id(blob); 732 f->length = *length; 733 f->length_flushed = *length; 734 f->length_xattr = *length; 735 f->append_pos = *length; 736 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 737 } else { 738 struct spdk_deleted_file *deleted_file; 739 740 deleted_file = calloc(1, sizeof(*deleted_file)); 741 if (deleted_file == NULL) { 742 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 743 free_fs_request(req); 744 return; 745 } 746 deleted_file->id = spdk_blob_get_id(blob); 747 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 748 } 749 } 750 751 static void 752 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 753 { 754 struct spdk_fs_request *req = ctx; 755 struct spdk_fs_cb_args *args = &req->args; 756 struct spdk_filesystem *fs = args->fs; 757 struct spdk_bs_type bstype; 758 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 759 static const struct spdk_bs_type zeros; 760 761 if (bserrno != 0) { 762 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 763 free_fs_request(req); 764 spdk_fs_free_io_channels(fs); 765 spdk_fs_io_device_unregister(fs); 766 return; 767 } 768 769 bstype = spdk_bs_get_bstype(bs); 770 771 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 772 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 773 spdk_bs_set_bstype(bs, blobfs_type); 774 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 775 SPDK_ERRLOG("not blobfs\n"); 776 SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 777 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 778 free_fs_request(req); 779 spdk_fs_free_io_channels(fs); 780 spdk_fs_io_device_unregister(fs); 781 return; 782 } 783 784 common_fs_bs_init(fs, bs); 785 fs_load_done(req, 0); 786 } 787 788 static void 789 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 790 { 791 assert(fs != NULL); 792 spdk_io_device_unregister(&fs->md_target, NULL); 793 spdk_io_device_unregister(&fs->sync_target, NULL); 794 spdk_io_device_unregister(&fs->io_target, NULL); 795 free(fs); 796 } 797 798 static void 799 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 800 { 801 assert(fs != NULL); 802 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 803 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 804 } 805 806 void 807 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 808 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 809 { 810 struct spdk_filesystem *fs; 811 struct spdk_fs_cb_args *args; 812 struct spdk_fs_request *req; 813 struct spdk_bs_opts bs_opts; 814 815 fs = fs_alloc(dev, send_request_fn); 816 if (fs == NULL) { 817 cb_fn(cb_arg, NULL, -ENOMEM); 818 return; 819 } 820 821 fs_conf_parse(); 822 823 req = alloc_fs_request(fs->md_target.md_fs_channel); 824 if (req == NULL) { 825 spdk_fs_free_io_channels(fs); 826 spdk_fs_io_device_unregister(fs); 827 cb_fn(cb_arg, NULL, -ENOMEM); 828 return; 829 } 830 831 args = &req->args; 832 args->fn.fs_op_with_handle = cb_fn; 833 args->arg = cb_arg; 834 args->fs = fs; 835 TAILQ_INIT(&args->op.fs_load.deleted_files); 836 spdk_bs_opts_init(&bs_opts); 837 bs_opts.iter_cb_fn = iter_cb; 838 bs_opts.iter_cb_arg = req; 839 spdk_bs_load(dev, &bs_opts, load_cb, req); 840 } 841 842 static void 843 unload_cb(void *ctx, int bserrno) 844 { 845 struct spdk_fs_request *req = ctx; 846 struct spdk_fs_cb_args *args = &req->args; 847 struct spdk_filesystem *fs = args->fs; 848 struct spdk_file *file, *tmp; 849 850 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 851 TAILQ_REMOVE(&fs->files, file, tailq); 852 cache_free_buffers(file); 853 free(file->name); 854 free(file->tree); 855 free(file); 856 } 857 858 pthread_mutex_lock(&g_cache_init_lock); 859 g_fs_count--; 860 if (g_fs_count == 0) { 861 __free_cache(); 862 } 863 pthread_mutex_unlock(&g_cache_init_lock); 864 865 args->fn.fs_op(args->arg, bserrno); 866 free(req); 867 868 spdk_fs_io_device_unregister(fs); 869 } 870 871 void 872 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 873 { 874 struct spdk_fs_request *req; 875 struct spdk_fs_cb_args *args; 876 877 /* 878 * We must free the md_channel before unloading the blobstore, so just 879 * allocate this request from the general heap. 880 */ 881 req = calloc(1, sizeof(*req)); 882 if (req == NULL) { 883 cb_fn(cb_arg, -ENOMEM); 884 return; 885 } 886 887 args = &req->args; 888 args->fn.fs_op = cb_fn; 889 args->arg = cb_arg; 890 args->fs = fs; 891 892 spdk_fs_free_io_channels(fs); 893 spdk_bs_unload(fs->bs, unload_cb, req); 894 } 895 896 static struct spdk_file * 897 fs_find_file(struct spdk_filesystem *fs, const char *name) 898 { 899 struct spdk_file *file; 900 901 TAILQ_FOREACH(file, &fs->files, tailq) { 902 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 903 return file; 904 } 905 } 906 907 return NULL; 908 } 909 910 void 911 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 912 spdk_file_stat_op_complete cb_fn, void *cb_arg) 913 { 914 struct spdk_file_stat stat; 915 struct spdk_file *f = NULL; 916 917 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 918 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 919 return; 920 } 921 922 f = fs_find_file(fs, name); 923 if (f != NULL) { 924 stat.blobid = f->blobid; 925 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 926 cb_fn(cb_arg, &stat, 0); 927 return; 928 } 929 930 cb_fn(cb_arg, NULL, -ENOENT); 931 } 932 933 static void 934 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 935 { 936 struct spdk_fs_request *req = arg; 937 struct spdk_fs_cb_args *args = &req->args; 938 939 args->rc = fserrno; 940 if (fserrno == 0) { 941 memcpy(args->arg, stat, sizeof(*stat)); 942 } 943 sem_post(args->sem); 944 } 945 946 static void 947 __file_stat(void *arg) 948 { 949 struct spdk_fs_request *req = arg; 950 struct spdk_fs_cb_args *args = &req->args; 951 952 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 953 args->fn.stat_op, req); 954 } 955 956 int 957 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 958 const char *name, struct spdk_file_stat *stat) 959 { 960 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 961 struct spdk_fs_request *req; 962 int rc; 963 964 req = alloc_fs_request(channel); 965 if (req == NULL) { 966 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 967 return -ENOMEM; 968 } 969 970 req->args.fs = fs; 971 req->args.op.stat.name = name; 972 req->args.fn.stat_op = __copy_stat; 973 req->args.arg = stat; 974 req->args.sem = &channel->sem; 975 channel->send_request(__file_stat, req); 976 sem_wait(&channel->sem); 977 978 rc = req->args.rc; 979 free_fs_request(req); 980 981 return rc; 982 } 983 984 static void 985 fs_create_blob_close_cb(void *ctx, int bserrno) 986 { 987 int rc; 988 struct spdk_fs_request *req = ctx; 989 struct spdk_fs_cb_args *args = &req->args; 990 991 rc = args->rc ? args->rc : bserrno; 992 args->fn.file_op(args->arg, rc); 993 free_fs_request(req); 994 } 995 996 static void 997 fs_create_blob_resize_cb(void *ctx, int bserrno) 998 { 999 struct spdk_fs_request *req = ctx; 1000 struct spdk_fs_cb_args *args = &req->args; 1001 struct spdk_file *f = args->file; 1002 struct spdk_blob *blob = args->op.create.blob; 1003 uint64_t length = 0; 1004 1005 args->rc = bserrno; 1006 if (bserrno) { 1007 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1008 return; 1009 } 1010 1011 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1012 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1013 1014 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1015 } 1016 1017 static void 1018 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1019 { 1020 struct spdk_fs_request *req = ctx; 1021 struct spdk_fs_cb_args *args = &req->args; 1022 1023 if (bserrno) { 1024 args->fn.file_op(args->arg, bserrno); 1025 free_fs_request(req); 1026 return; 1027 } 1028 1029 args->op.create.blob = blob; 1030 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1031 } 1032 1033 static void 1034 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1035 { 1036 struct spdk_fs_request *req = ctx; 1037 struct spdk_fs_cb_args *args = &req->args; 1038 struct spdk_file *f = args->file; 1039 1040 if (bserrno) { 1041 args->fn.file_op(args->arg, bserrno); 1042 free_fs_request(req); 1043 return; 1044 } 1045 1046 f->blobid = blobid; 1047 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1048 } 1049 1050 void 1051 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1052 spdk_file_op_complete cb_fn, void *cb_arg) 1053 { 1054 struct spdk_file *file; 1055 struct spdk_fs_request *req; 1056 struct spdk_fs_cb_args *args; 1057 1058 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1059 cb_fn(cb_arg, -ENAMETOOLONG); 1060 return; 1061 } 1062 1063 file = fs_find_file(fs, name); 1064 if (file != NULL) { 1065 cb_fn(cb_arg, -EEXIST); 1066 return; 1067 } 1068 1069 file = file_alloc(fs); 1070 if (file == NULL) { 1071 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1072 cb_fn(cb_arg, -ENOMEM); 1073 return; 1074 } 1075 1076 req = alloc_fs_request(fs->md_target.md_fs_channel); 1077 if (req == NULL) { 1078 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1079 cb_fn(cb_arg, -ENOMEM); 1080 return; 1081 } 1082 1083 args = &req->args; 1084 args->file = file; 1085 args->fn.file_op = cb_fn; 1086 args->arg = cb_arg; 1087 1088 file->name = strdup(name); 1089 _file_build_trace_arg_name(file); 1090 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1091 } 1092 1093 static void 1094 __fs_create_file_done(void *arg, int fserrno) 1095 { 1096 struct spdk_fs_request *req = arg; 1097 struct spdk_fs_cb_args *args = &req->args; 1098 1099 __wake_caller(args, fserrno); 1100 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1101 } 1102 1103 static void 1104 __fs_create_file(void *arg) 1105 { 1106 struct spdk_fs_request *req = arg; 1107 struct spdk_fs_cb_args *args = &req->args; 1108 1109 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1110 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1111 } 1112 1113 int 1114 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1115 { 1116 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1117 struct spdk_fs_request *req; 1118 struct spdk_fs_cb_args *args; 1119 int rc; 1120 1121 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1122 1123 req = alloc_fs_request(channel); 1124 if (req == NULL) { 1125 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1126 return -ENOMEM; 1127 } 1128 1129 args = &req->args; 1130 args->fs = fs; 1131 args->op.create.name = name; 1132 args->sem = &channel->sem; 1133 fs->send_request(__fs_create_file, req); 1134 sem_wait(&channel->sem); 1135 rc = args->rc; 1136 free_fs_request(req); 1137 1138 return rc; 1139 } 1140 1141 static void 1142 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1143 { 1144 struct spdk_fs_request *req = ctx; 1145 struct spdk_fs_cb_args *args = &req->args; 1146 struct spdk_file *f = args->file; 1147 1148 f->blob = blob; 1149 while (!TAILQ_EMPTY(&f->open_requests)) { 1150 req = TAILQ_FIRST(&f->open_requests); 1151 args = &req->args; 1152 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1153 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name); 1154 args->fn.file_op_with_handle(args->arg, f, bserrno); 1155 free_fs_request(req); 1156 } 1157 } 1158 1159 static void 1160 fs_open_blob_create_cb(void *ctx, int bserrno) 1161 { 1162 struct spdk_fs_request *req = ctx; 1163 struct spdk_fs_cb_args *args = &req->args; 1164 struct spdk_file *file = args->file; 1165 struct spdk_filesystem *fs = args->fs; 1166 1167 if (file == NULL) { 1168 /* 1169 * This is from an open with CREATE flag - the file 1170 * is now created so look it up in the file list for this 1171 * filesystem. 1172 */ 1173 file = fs_find_file(fs, args->op.open.name); 1174 assert(file != NULL); 1175 args->file = file; 1176 } 1177 1178 file->ref_count++; 1179 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1180 if (file->ref_count == 1) { 1181 assert(file->blob == NULL); 1182 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1183 } else if (file->blob != NULL) { 1184 fs_open_blob_done(req, file->blob, 0); 1185 } else { 1186 /* 1187 * The blob open for this file is in progress due to a previous 1188 * open request. When that open completes, it will invoke the 1189 * open callback for this request. 1190 */ 1191 } 1192 } 1193 1194 void 1195 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1196 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1197 { 1198 struct spdk_file *f = NULL; 1199 struct spdk_fs_request *req; 1200 struct spdk_fs_cb_args *args; 1201 1202 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1203 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1204 return; 1205 } 1206 1207 f = fs_find_file(fs, name); 1208 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1209 cb_fn(cb_arg, NULL, -ENOENT); 1210 return; 1211 } 1212 1213 if (f != NULL && f->is_deleted == true) { 1214 cb_fn(cb_arg, NULL, -ENOENT); 1215 return; 1216 } 1217 1218 req = alloc_fs_request(fs->md_target.md_fs_channel); 1219 if (req == NULL) { 1220 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1221 cb_fn(cb_arg, NULL, -ENOMEM); 1222 return; 1223 } 1224 1225 args = &req->args; 1226 args->fn.file_op_with_handle = cb_fn; 1227 args->arg = cb_arg; 1228 args->file = f; 1229 args->fs = fs; 1230 args->op.open.name = name; 1231 1232 if (f == NULL) { 1233 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1234 } else { 1235 fs_open_blob_create_cb(req, 0); 1236 } 1237 } 1238 1239 static void 1240 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1241 { 1242 struct spdk_fs_request *req = arg; 1243 struct spdk_fs_cb_args *args = &req->args; 1244 1245 args->file = file; 1246 __wake_caller(args, bserrno); 1247 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1248 } 1249 1250 static void 1251 __fs_open_file(void *arg) 1252 { 1253 struct spdk_fs_request *req = arg; 1254 struct spdk_fs_cb_args *args = &req->args; 1255 1256 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1257 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1258 __fs_open_file_done, req); 1259 } 1260 1261 int 1262 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1263 const char *name, uint32_t flags, struct spdk_file **file) 1264 { 1265 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1266 struct spdk_fs_request *req; 1267 struct spdk_fs_cb_args *args; 1268 int rc; 1269 1270 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1271 1272 req = alloc_fs_request(channel); 1273 if (req == NULL) { 1274 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1275 return -ENOMEM; 1276 } 1277 1278 args = &req->args; 1279 args->fs = fs; 1280 args->op.open.name = name; 1281 args->op.open.flags = flags; 1282 args->sem = &channel->sem; 1283 fs->send_request(__fs_open_file, req); 1284 sem_wait(&channel->sem); 1285 rc = args->rc; 1286 if (rc == 0) { 1287 *file = args->file; 1288 } else { 1289 *file = NULL; 1290 } 1291 free_fs_request(req); 1292 1293 return rc; 1294 } 1295 1296 static void 1297 fs_rename_blob_close_cb(void *ctx, int bserrno) 1298 { 1299 struct spdk_fs_request *req = ctx; 1300 struct spdk_fs_cb_args *args = &req->args; 1301 1302 args->fn.fs_op(args->arg, bserrno); 1303 free_fs_request(req); 1304 } 1305 1306 static void 1307 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1308 { 1309 struct spdk_fs_request *req = ctx; 1310 struct spdk_fs_cb_args *args = &req->args; 1311 const char *new_name = args->op.rename.new_name; 1312 1313 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1314 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1315 } 1316 1317 static void 1318 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1319 { 1320 struct spdk_fs_cb_args *args = &req->args; 1321 struct spdk_file *f; 1322 1323 f = fs_find_file(args->fs, args->op.rename.old_name); 1324 if (f == NULL) { 1325 args->fn.fs_op(args->arg, -ENOENT); 1326 free_fs_request(req); 1327 return; 1328 } 1329 1330 free(f->name); 1331 f->name = strdup(args->op.rename.new_name); 1332 _file_build_trace_arg_name(f); 1333 args->file = f; 1334 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1335 } 1336 1337 static void 1338 fs_rename_delete_done(void *arg, int fserrno) 1339 { 1340 __spdk_fs_md_rename_file(arg); 1341 } 1342 1343 void 1344 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1345 const char *old_name, const char *new_name, 1346 spdk_file_op_complete cb_fn, void *cb_arg) 1347 { 1348 struct spdk_file *f; 1349 struct spdk_fs_request *req; 1350 struct spdk_fs_cb_args *args; 1351 1352 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1353 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1354 cb_fn(cb_arg, -ENAMETOOLONG); 1355 return; 1356 } 1357 1358 req = alloc_fs_request(fs->md_target.md_fs_channel); 1359 if (req == NULL) { 1360 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1361 new_name); 1362 cb_fn(cb_arg, -ENOMEM); 1363 return; 1364 } 1365 1366 args = &req->args; 1367 args->fn.fs_op = cb_fn; 1368 args->fs = fs; 1369 args->arg = cb_arg; 1370 args->op.rename.old_name = old_name; 1371 args->op.rename.new_name = new_name; 1372 1373 f = fs_find_file(fs, new_name); 1374 if (f == NULL) { 1375 __spdk_fs_md_rename_file(req); 1376 return; 1377 } 1378 1379 /* 1380 * The rename overwrites an existing file. So delete the existing file, then 1381 * do the actual rename. 1382 */ 1383 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1384 } 1385 1386 static void 1387 __fs_rename_file_done(void *arg, int fserrno) 1388 { 1389 struct spdk_fs_request *req = arg; 1390 struct spdk_fs_cb_args *args = &req->args; 1391 1392 __wake_caller(args, fserrno); 1393 } 1394 1395 static void 1396 __fs_rename_file(void *arg) 1397 { 1398 struct spdk_fs_request *req = arg; 1399 struct spdk_fs_cb_args *args = &req->args; 1400 1401 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1402 __fs_rename_file_done, req); 1403 } 1404 1405 int 1406 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1407 const char *old_name, const char *new_name) 1408 { 1409 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1410 struct spdk_fs_request *req; 1411 struct spdk_fs_cb_args *args; 1412 int rc; 1413 1414 req = alloc_fs_request(channel); 1415 if (req == NULL) { 1416 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1417 return -ENOMEM; 1418 } 1419 1420 args = &req->args; 1421 1422 args->fs = fs; 1423 args->op.rename.old_name = old_name; 1424 args->op.rename.new_name = new_name; 1425 args->sem = &channel->sem; 1426 fs->send_request(__fs_rename_file, req); 1427 sem_wait(&channel->sem); 1428 rc = args->rc; 1429 free_fs_request(req); 1430 return rc; 1431 } 1432 1433 static void 1434 blob_delete_cb(void *ctx, int bserrno) 1435 { 1436 struct spdk_fs_request *req = ctx; 1437 struct spdk_fs_cb_args *args = &req->args; 1438 1439 args->fn.file_op(args->arg, bserrno); 1440 free_fs_request(req); 1441 } 1442 1443 void 1444 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1445 spdk_file_op_complete cb_fn, void *cb_arg) 1446 { 1447 struct spdk_file *f; 1448 spdk_blob_id blobid; 1449 struct spdk_fs_request *req; 1450 struct spdk_fs_cb_args *args; 1451 1452 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1453 1454 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1455 cb_fn(cb_arg, -ENAMETOOLONG); 1456 return; 1457 } 1458 1459 f = fs_find_file(fs, name); 1460 if (f == NULL) { 1461 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1462 cb_fn(cb_arg, -ENOENT); 1463 return; 1464 } 1465 1466 req = alloc_fs_request(fs->md_target.md_fs_channel); 1467 if (req == NULL) { 1468 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1469 cb_fn(cb_arg, -ENOMEM); 1470 return; 1471 } 1472 1473 args = &req->args; 1474 args->fn.file_op = cb_fn; 1475 args->arg = cb_arg; 1476 1477 if (f->ref_count > 0) { 1478 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1479 f->is_deleted = true; 1480 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1481 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1482 return; 1483 } 1484 1485 TAILQ_REMOVE(&fs->files, f, tailq); 1486 1487 /* It's safe to free cache buffers here while another thread 1488 * is trying to free the same file cache buffers, because each 1489 * thread will get the g_caches_lock first. 1490 */ 1491 cache_free_buffers(f); 1492 1493 blobid = f->blobid; 1494 1495 free(f->name); 1496 free(f->tree); 1497 free(f); 1498 1499 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1500 } 1501 1502 static uint64_t 1503 fs_name_to_uint64(const char *name) 1504 { 1505 uint64_t result = 0; 1506 memcpy(&result, name, spdk_min(sizeof(result), strlen(name))); 1507 return result; 1508 } 1509 1510 static void 1511 __fs_delete_file_done(void *arg, int fserrno) 1512 { 1513 struct spdk_fs_request *req = arg; 1514 struct spdk_fs_cb_args *args = &req->args; 1515 1516 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1517 __wake_caller(args, fserrno); 1518 } 1519 1520 static void 1521 __fs_delete_file(void *arg) 1522 { 1523 struct spdk_fs_request *req = arg; 1524 struct spdk_fs_cb_args *args = &req->args; 1525 1526 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1527 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1528 } 1529 1530 int 1531 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1532 const char *name) 1533 { 1534 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1535 struct spdk_fs_request *req; 1536 struct spdk_fs_cb_args *args; 1537 int rc; 1538 1539 req = alloc_fs_request(channel); 1540 if (req == NULL) { 1541 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name); 1542 return -ENOMEM; 1543 } 1544 1545 args = &req->args; 1546 args->fs = fs; 1547 args->op.delete.name = name; 1548 args->sem = &channel->sem; 1549 fs->send_request(__fs_delete_file, req); 1550 sem_wait(&channel->sem); 1551 rc = args->rc; 1552 free_fs_request(req); 1553 1554 return rc; 1555 } 1556 1557 spdk_fs_iter 1558 spdk_fs_iter_first(struct spdk_filesystem *fs) 1559 { 1560 struct spdk_file *f; 1561 1562 f = TAILQ_FIRST(&fs->files); 1563 return f; 1564 } 1565 1566 spdk_fs_iter 1567 spdk_fs_iter_next(spdk_fs_iter iter) 1568 { 1569 struct spdk_file *f = iter; 1570 1571 if (f == NULL) { 1572 return NULL; 1573 } 1574 1575 f = TAILQ_NEXT(f, tailq); 1576 return f; 1577 } 1578 1579 const char * 1580 spdk_file_get_name(struct spdk_file *file) 1581 { 1582 return file->name; 1583 } 1584 1585 uint64_t 1586 spdk_file_get_length(struct spdk_file *file) 1587 { 1588 uint64_t length; 1589 1590 assert(file != NULL); 1591 1592 length = file->append_pos >= file->length ? file->append_pos : file->length; 1593 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1594 return length; 1595 } 1596 1597 static void 1598 fs_truncate_complete_cb(void *ctx, int bserrno) 1599 { 1600 struct spdk_fs_request *req = ctx; 1601 struct spdk_fs_cb_args *args = &req->args; 1602 1603 args->fn.file_op(args->arg, bserrno); 1604 free_fs_request(req); 1605 } 1606 1607 static void 1608 fs_truncate_resize_cb(void *ctx, int bserrno) 1609 { 1610 struct spdk_fs_request *req = ctx; 1611 struct spdk_fs_cb_args *args = &req->args; 1612 struct spdk_file *file = args->file; 1613 uint64_t *length = &args->op.truncate.length; 1614 1615 if (bserrno) { 1616 args->fn.file_op(args->arg, bserrno); 1617 free_fs_request(req); 1618 return; 1619 } 1620 1621 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1622 1623 file->length = *length; 1624 if (file->append_pos > file->length) { 1625 file->append_pos = file->length; 1626 } 1627 1628 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1629 } 1630 1631 static uint64_t 1632 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1633 { 1634 return (length + cluster_sz - 1) / cluster_sz; 1635 } 1636 1637 void 1638 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1639 spdk_file_op_complete cb_fn, void *cb_arg) 1640 { 1641 struct spdk_filesystem *fs; 1642 size_t num_clusters; 1643 struct spdk_fs_request *req; 1644 struct spdk_fs_cb_args *args; 1645 1646 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1647 if (length == file->length) { 1648 cb_fn(cb_arg, 0); 1649 return; 1650 } 1651 1652 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1653 if (req == NULL) { 1654 cb_fn(cb_arg, -ENOMEM); 1655 return; 1656 } 1657 1658 args = &req->args; 1659 args->fn.file_op = cb_fn; 1660 args->arg = cb_arg; 1661 args->file = file; 1662 args->op.truncate.length = length; 1663 fs = file->fs; 1664 1665 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1666 1667 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1668 } 1669 1670 static void 1671 __truncate(void *arg) 1672 { 1673 struct spdk_fs_request *req = arg; 1674 struct spdk_fs_cb_args *args = &req->args; 1675 1676 spdk_file_truncate_async(args->file, args->op.truncate.length, 1677 args->fn.file_op, args); 1678 } 1679 1680 int 1681 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1682 uint64_t length) 1683 { 1684 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1685 struct spdk_fs_request *req; 1686 struct spdk_fs_cb_args *args; 1687 int rc; 1688 1689 req = alloc_fs_request(channel); 1690 if (req == NULL) { 1691 return -ENOMEM; 1692 } 1693 1694 args = &req->args; 1695 1696 args->file = file; 1697 args->op.truncate.length = length; 1698 args->fn.file_op = __wake_caller; 1699 args->sem = &channel->sem; 1700 1701 channel->send_request(__truncate, req); 1702 sem_wait(&channel->sem); 1703 rc = args->rc; 1704 free_fs_request(req); 1705 1706 return rc; 1707 } 1708 1709 static void 1710 __rw_done(void *ctx, int bserrno) 1711 { 1712 struct spdk_fs_request *req = ctx; 1713 struct spdk_fs_cb_args *args = &req->args; 1714 1715 spdk_free(args->op.rw.pin_buf); 1716 args->fn.file_op(args->arg, bserrno); 1717 free_fs_request(req); 1718 } 1719 1720 static void 1721 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1722 { 1723 int i; 1724 size_t len; 1725 1726 for (i = 0; i < iovcnt; i++) { 1727 len = spdk_min(iovs[i].iov_len, buf_len); 1728 memcpy(buf, iovs[i].iov_base, len); 1729 buf += len; 1730 assert(buf_len >= len); 1731 buf_len -= len; 1732 } 1733 } 1734 1735 static void 1736 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1737 { 1738 int i; 1739 size_t len; 1740 1741 for (i = 0; i < iovcnt; i++) { 1742 len = spdk_min(iovs[i].iov_len, buf_len); 1743 memcpy(iovs[i].iov_base, buf, len); 1744 buf += len; 1745 assert(buf_len >= len); 1746 buf_len -= len; 1747 } 1748 } 1749 1750 static void 1751 __read_done(void *ctx, int bserrno) 1752 { 1753 struct spdk_fs_request *req = ctx; 1754 struct spdk_fs_cb_args *args = &req->args; 1755 void *buf; 1756 1757 assert(req != NULL); 1758 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1759 if (args->op.rw.is_read) { 1760 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1761 __rw_done(req, 0); 1762 } else { 1763 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1764 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1765 args->op.rw.pin_buf, 1766 args->op.rw.start_lba, args->op.rw.num_lba, 1767 __rw_done, req); 1768 } 1769 } 1770 1771 static void 1772 __do_blob_read(void *ctx, int fserrno) 1773 { 1774 struct spdk_fs_request *req = ctx; 1775 struct spdk_fs_cb_args *args = &req->args; 1776 1777 if (fserrno) { 1778 __rw_done(req, fserrno); 1779 return; 1780 } 1781 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1782 args->op.rw.pin_buf, 1783 args->op.rw.start_lba, args->op.rw.num_lba, 1784 __read_done, req); 1785 } 1786 1787 static void 1788 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1789 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1790 { 1791 uint64_t end_lba; 1792 1793 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1794 *start_lba = offset / *lba_size; 1795 end_lba = (offset + length - 1) / *lba_size; 1796 *num_lba = (end_lba - *start_lba + 1); 1797 } 1798 1799 static bool 1800 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1801 { 1802 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1803 1804 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1805 return true; 1806 } 1807 1808 return false; 1809 } 1810 1811 static void 1812 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1813 { 1814 uint32_t i; 1815 1816 for (i = 0; i < iovcnt; i++) { 1817 req->args.iovs[i].iov_base = iovs[i].iov_base; 1818 req->args.iovs[i].iov_len = iovs[i].iov_len; 1819 } 1820 } 1821 1822 static void 1823 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1824 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1825 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1826 { 1827 struct spdk_fs_request *req; 1828 struct spdk_fs_cb_args *args; 1829 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1830 uint64_t start_lba, num_lba, pin_buf_length; 1831 uint32_t lba_size; 1832 1833 if (is_read && offset + length > file->length) { 1834 cb_fn(cb_arg, -EINVAL); 1835 return; 1836 } 1837 1838 req = alloc_fs_request_with_iov(channel, iovcnt); 1839 if (req == NULL) { 1840 cb_fn(cb_arg, -ENOMEM); 1841 return; 1842 } 1843 1844 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1845 1846 args = &req->args; 1847 args->fn.file_op = cb_fn; 1848 args->arg = cb_arg; 1849 args->file = file; 1850 args->op.rw.channel = channel->bs_channel; 1851 _fs_request_setup_iovs(req, iovs, iovcnt); 1852 args->op.rw.is_read = is_read; 1853 args->op.rw.offset = offset; 1854 args->op.rw.blocklen = lba_size; 1855 1856 pin_buf_length = num_lba * lba_size; 1857 args->op.rw.length = pin_buf_length; 1858 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1859 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1860 if (args->op.rw.pin_buf == NULL) { 1861 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1862 file->name, offset, length); 1863 free_fs_request(req); 1864 cb_fn(cb_arg, -ENOMEM); 1865 return; 1866 } 1867 1868 args->op.rw.start_lba = start_lba; 1869 args->op.rw.num_lba = num_lba; 1870 1871 if (!is_read && file->length < offset + length) { 1872 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1873 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1874 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1875 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1876 args->op.rw.pin_buf, 1877 args->op.rw.start_lba, args->op.rw.num_lba, 1878 __rw_done, req); 1879 } else { 1880 __do_blob_read(req, 0); 1881 } 1882 } 1883 1884 static void 1885 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1886 void *payload, uint64_t offset, uint64_t length, 1887 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1888 { 1889 struct iovec iov; 1890 1891 iov.iov_base = payload; 1892 iov.iov_len = (size_t)length; 1893 1894 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1895 } 1896 1897 void 1898 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1899 void *payload, uint64_t offset, uint64_t length, 1900 spdk_file_op_complete cb_fn, void *cb_arg) 1901 { 1902 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1903 } 1904 1905 void 1906 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1907 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1908 spdk_file_op_complete cb_fn, void *cb_arg) 1909 { 1910 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1911 file->name, offset, length); 1912 1913 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1914 } 1915 1916 void 1917 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1918 void *payload, uint64_t offset, uint64_t length, 1919 spdk_file_op_complete cb_fn, void *cb_arg) 1920 { 1921 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1922 file->name, offset, length); 1923 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1924 } 1925 1926 void 1927 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1928 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1929 spdk_file_op_complete cb_fn, void *cb_arg) 1930 { 1931 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1932 file->name, offset, length); 1933 1934 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1935 } 1936 1937 struct spdk_io_channel * 1938 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1939 { 1940 struct spdk_io_channel *io_channel; 1941 struct spdk_fs_channel *fs_channel; 1942 1943 io_channel = spdk_get_io_channel(&fs->io_target); 1944 fs_channel = spdk_io_channel_get_ctx(io_channel); 1945 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1946 fs_channel->send_request = __send_request_direct; 1947 1948 return io_channel; 1949 } 1950 1951 void 1952 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1953 { 1954 spdk_put_io_channel(channel); 1955 } 1956 1957 struct spdk_fs_thread_ctx * 1958 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1959 { 1960 struct spdk_fs_thread_ctx *ctx; 1961 1962 ctx = calloc(1, sizeof(*ctx)); 1963 if (!ctx) { 1964 return NULL; 1965 } 1966 1967 _spdk_fs_channel_create(fs, &ctx->ch, 512); 1968 1969 ctx->ch.send_request = fs->send_request; 1970 ctx->ch.sync = 1; 1971 pthread_spin_init(&ctx->ch.lock, 0); 1972 1973 return ctx; 1974 } 1975 1976 1977 void 1978 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1979 { 1980 assert(ctx->ch.sync == 1); 1981 1982 while (true) { 1983 pthread_spin_lock(&ctx->ch.lock); 1984 if (ctx->ch.outstanding_reqs == 0) { 1985 pthread_spin_unlock(&ctx->ch.lock); 1986 break; 1987 } 1988 pthread_spin_unlock(&ctx->ch.lock); 1989 usleep(1000); 1990 } 1991 1992 _spdk_fs_channel_destroy(NULL, &ctx->ch); 1993 free(ctx); 1994 } 1995 1996 int 1997 spdk_fs_set_cache_size(uint64_t size_in_mb) 1998 { 1999 /* setting g_fs_cache_size is only permitted if cache pool 2000 * is already freed or hasn't been initialized 2001 */ 2002 if (g_cache_pool != NULL) { 2003 return -EPERM; 2004 } 2005 2006 g_fs_cache_size = size_in_mb * 1024 * 1024; 2007 2008 return 0; 2009 } 2010 2011 uint64_t 2012 spdk_fs_get_cache_size(void) 2013 { 2014 return g_fs_cache_size / (1024 * 1024); 2015 } 2016 2017 static void __file_flush(void *ctx); 2018 2019 /* Try to free some cache buffers of this file, this function must 2020 * be called while holding g_caches_lock. 2021 */ 2022 static int 2023 reclaim_cache_buffers(struct spdk_file *file) 2024 { 2025 int rc; 2026 2027 BLOBFS_TRACE(file, "free=%s\n", file->name); 2028 2029 /* The function is safe to be called with any threads, while the file 2030 * lock maybe locked by other thread for now, so try to get the file 2031 * lock here. 2032 */ 2033 rc = pthread_spin_trylock(&file->lock); 2034 if (rc != 0) { 2035 return -1; 2036 } 2037 2038 if (file->tree->present_mask == 0) { 2039 pthread_spin_unlock(&file->lock); 2040 return -1; 2041 } 2042 spdk_tree_free_buffers(file->tree); 2043 2044 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2045 /* If not freed, put it in the end of the queue */ 2046 if (file->tree->present_mask != 0) { 2047 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2048 } else { 2049 file->last = NULL; 2050 } 2051 pthread_spin_unlock(&file->lock); 2052 2053 return 0; 2054 } 2055 2056 static void * 2057 alloc_cache_memory_buffer(struct spdk_file *context) 2058 { 2059 struct spdk_file *file, *tmp; 2060 void *buf; 2061 int rc; 2062 2063 buf = spdk_mempool_get(g_cache_pool); 2064 if (buf != NULL) { 2065 return buf; 2066 } 2067 2068 pthread_spin_lock(&g_caches_lock); 2069 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2070 if (!file->open_for_writing && 2071 file->priority == SPDK_FILE_PRIORITY_LOW && 2072 file != context) { 2073 rc = reclaim_cache_buffers(file); 2074 if (rc < 0) { 2075 continue; 2076 } 2077 buf = spdk_mempool_get(g_cache_pool); 2078 if (buf != NULL) { 2079 pthread_spin_unlock(&g_caches_lock); 2080 return buf; 2081 } 2082 break; 2083 } 2084 } 2085 2086 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2087 if (!file->open_for_writing && 2088 file != context) { 2089 rc = reclaim_cache_buffers(file); 2090 if (rc < 0) { 2091 continue; 2092 } 2093 buf = spdk_mempool_get(g_cache_pool); 2094 if (buf != NULL) { 2095 pthread_spin_unlock(&g_caches_lock); 2096 return buf; 2097 } 2098 break; 2099 } 2100 } 2101 2102 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2103 if (file != context) { 2104 rc = reclaim_cache_buffers(file); 2105 if (rc < 0) { 2106 continue; 2107 } 2108 buf = spdk_mempool_get(g_cache_pool); 2109 if (buf != NULL) { 2110 pthread_spin_unlock(&g_caches_lock); 2111 return buf; 2112 } 2113 break; 2114 } 2115 } 2116 pthread_spin_unlock(&g_caches_lock); 2117 2118 return NULL; 2119 } 2120 2121 static struct cache_buffer * 2122 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2123 { 2124 struct cache_buffer *buf; 2125 int count = 0; 2126 2127 buf = calloc(1, sizeof(*buf)); 2128 if (buf == NULL) { 2129 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 2130 return NULL; 2131 } 2132 2133 buf->buf = alloc_cache_memory_buffer(file); 2134 while (buf->buf == NULL) { 2135 /* 2136 * TODO: alloc_cache_memory_buffer() should eventually free 2137 * some buffers. Need a more sophisticated check here, instead 2138 * of just bailing if 100 tries does not result in getting a 2139 * free buffer. This will involve using the sync channel's 2140 * semaphore to block until a buffer becomes available. 2141 */ 2142 if (count++ == 100) { 2143 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2144 file, offset); 2145 free(buf); 2146 return NULL; 2147 } 2148 buf->buf = alloc_cache_memory_buffer(file); 2149 } 2150 2151 buf->buf_size = CACHE_BUFFER_SIZE; 2152 buf->offset = offset; 2153 2154 pthread_spin_lock(&g_caches_lock); 2155 if (file->tree->present_mask == 0) { 2156 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2157 } 2158 file->tree = spdk_tree_insert_buffer(file->tree, buf); 2159 pthread_spin_unlock(&g_caches_lock); 2160 2161 return buf; 2162 } 2163 2164 static struct cache_buffer * 2165 cache_append_buffer(struct spdk_file *file) 2166 { 2167 struct cache_buffer *last; 2168 2169 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2170 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2171 2172 last = cache_insert_buffer(file, file->append_pos); 2173 if (last == NULL) { 2174 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 2175 return NULL; 2176 } 2177 2178 file->last = last; 2179 2180 return last; 2181 } 2182 2183 static void __check_sync_reqs(struct spdk_file *file); 2184 2185 static void 2186 __file_cache_finish_sync(void *ctx, int bserrno) 2187 { 2188 struct spdk_file *file; 2189 struct spdk_fs_request *sync_req = ctx; 2190 struct spdk_fs_cb_args *sync_args; 2191 2192 sync_args = &sync_req->args; 2193 file = sync_args->file; 2194 pthread_spin_lock(&file->lock); 2195 file->length_xattr = sync_args->op.sync.length; 2196 assert(sync_args->op.sync.offset <= file->length_flushed); 2197 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2198 0, file->trace_arg_name); 2199 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2200 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2201 pthread_spin_unlock(&file->lock); 2202 2203 sync_args->fn.file_op(sync_args->arg, bserrno); 2204 pthread_spin_lock(&file->lock); 2205 free_fs_request(sync_req); 2206 pthread_spin_unlock(&file->lock); 2207 2208 __check_sync_reqs(file); 2209 } 2210 2211 static void 2212 __check_sync_reqs(struct spdk_file *file) 2213 { 2214 struct spdk_fs_request *sync_req; 2215 2216 pthread_spin_lock(&file->lock); 2217 2218 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2219 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2220 break; 2221 } 2222 } 2223 2224 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2225 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2226 sync_req->args.op.sync.xattr_in_progress = true; 2227 sync_req->args.op.sync.length = file->length_flushed; 2228 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2229 sizeof(file->length_flushed)); 2230 2231 pthread_spin_unlock(&file->lock); 2232 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2233 0, file->trace_arg_name); 2234 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2235 } else { 2236 pthread_spin_unlock(&file->lock); 2237 } 2238 } 2239 2240 static void 2241 __file_flush_done(void *ctx, int bserrno) 2242 { 2243 struct spdk_fs_request *req = ctx; 2244 struct spdk_fs_cb_args *args = &req->args; 2245 struct spdk_file *file = args->file; 2246 struct cache_buffer *next = args->op.flush.cache_buffer; 2247 2248 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2249 2250 pthread_spin_lock(&file->lock); 2251 next->in_progress = false; 2252 next->bytes_flushed += args->op.flush.length; 2253 file->length_flushed += args->op.flush.length; 2254 if (file->length_flushed > file->length) { 2255 file->length = file->length_flushed; 2256 } 2257 if (next->bytes_flushed == next->buf_size) { 2258 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2259 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2260 } 2261 2262 /* 2263 * Assert that there is no cached data that extends past the end of the underlying 2264 * blob. 2265 */ 2266 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2267 next->bytes_filled == 0); 2268 2269 pthread_spin_unlock(&file->lock); 2270 2271 __check_sync_reqs(file); 2272 2273 __file_flush(req); 2274 } 2275 2276 static void 2277 __file_flush(void *ctx) 2278 { 2279 struct spdk_fs_request *req = ctx; 2280 struct spdk_fs_cb_args *args = &req->args; 2281 struct spdk_file *file = args->file; 2282 struct cache_buffer *next; 2283 uint64_t offset, length, start_lba, num_lba; 2284 uint32_t lba_size; 2285 2286 pthread_spin_lock(&file->lock); 2287 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2288 if (next == NULL || next->in_progress || 2289 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2290 /* 2291 * There is either no data to flush, a flush I/O is already in 2292 * progress, or the next buffer is partially filled but there's no 2293 * outstanding request to sync it. 2294 * So return immediately - if a flush I/O is in progress we will flush 2295 * more data after that is completed, or a partial buffer will get flushed 2296 * when it is either filled or the file is synced. 2297 */ 2298 free_fs_request(req); 2299 if (next == NULL) { 2300 /* 2301 * For cases where a file's cache was evicted, and then the 2302 * file was later appended, we will write the data directly 2303 * to disk and bypass cache. So just update length_flushed 2304 * here to reflect that all data was already written to disk. 2305 */ 2306 file->length_flushed = file->append_pos; 2307 } 2308 pthread_spin_unlock(&file->lock); 2309 if (next == NULL) { 2310 /* 2311 * There is no data to flush, but we still need to check for any 2312 * outstanding sync requests to make sure metadata gets updated. 2313 */ 2314 __check_sync_reqs(file); 2315 } 2316 return; 2317 } 2318 2319 offset = next->offset + next->bytes_flushed; 2320 length = next->bytes_filled - next->bytes_flushed; 2321 if (length == 0) { 2322 free_fs_request(req); 2323 pthread_spin_unlock(&file->lock); 2324 /* 2325 * There is no data to flush, but we still need to check for any 2326 * outstanding sync requests to make sure metadata gets updated. 2327 */ 2328 __check_sync_reqs(file); 2329 return; 2330 } 2331 args->op.flush.length = length; 2332 args->op.flush.cache_buffer = next; 2333 2334 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2335 2336 next->in_progress = true; 2337 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2338 offset, length, start_lba, num_lba); 2339 pthread_spin_unlock(&file->lock); 2340 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2341 next->buf + (start_lba * lba_size) - next->offset, 2342 start_lba, num_lba, __file_flush_done, req); 2343 } 2344 2345 static void 2346 __file_extend_done(void *arg, int bserrno) 2347 { 2348 struct spdk_fs_cb_args *args = arg; 2349 2350 __wake_caller(args, bserrno); 2351 } 2352 2353 static void 2354 __file_extend_resize_cb(void *_args, int bserrno) 2355 { 2356 struct spdk_fs_cb_args *args = _args; 2357 struct spdk_file *file = args->file; 2358 2359 if (bserrno) { 2360 __wake_caller(args, bserrno); 2361 return; 2362 } 2363 2364 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2365 } 2366 2367 static void 2368 __file_extend_blob(void *_args) 2369 { 2370 struct spdk_fs_cb_args *args = _args; 2371 struct spdk_file *file = args->file; 2372 2373 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2374 } 2375 2376 static void 2377 __rw_from_file_done(void *ctx, int bserrno) 2378 { 2379 struct spdk_fs_request *req = ctx; 2380 2381 __wake_caller(&req->args, bserrno); 2382 free_fs_request(req); 2383 } 2384 2385 static void 2386 __rw_from_file(void *ctx) 2387 { 2388 struct spdk_fs_request *req = ctx; 2389 struct spdk_fs_cb_args *args = &req->args; 2390 struct spdk_file *file = args->file; 2391 2392 if (args->op.rw.is_read) { 2393 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2394 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2395 __rw_from_file_done, req); 2396 } else { 2397 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2398 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2399 __rw_from_file_done, req); 2400 } 2401 } 2402 2403 static int 2404 __send_rw_from_file(struct spdk_file *file, void *payload, 2405 uint64_t offset, uint64_t length, bool is_read, 2406 struct spdk_fs_channel *channel) 2407 { 2408 struct spdk_fs_request *req; 2409 struct spdk_fs_cb_args *args; 2410 2411 req = alloc_fs_request_with_iov(channel, 1); 2412 if (req == NULL) { 2413 sem_post(&channel->sem); 2414 return -ENOMEM; 2415 } 2416 2417 args = &req->args; 2418 args->file = file; 2419 args->sem = &channel->sem; 2420 args->iovs[0].iov_base = payload; 2421 args->iovs[0].iov_len = (size_t)length; 2422 args->op.rw.offset = offset; 2423 args->op.rw.is_read = is_read; 2424 file->fs->send_request(__rw_from_file, req); 2425 return 0; 2426 } 2427 2428 int 2429 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2430 void *payload, uint64_t offset, uint64_t length) 2431 { 2432 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2433 struct spdk_fs_request *flush_req; 2434 uint64_t rem_length, copy, blob_size, cluster_sz; 2435 uint32_t cache_buffers_filled = 0; 2436 uint8_t *cur_payload; 2437 struct cache_buffer *last; 2438 2439 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2440 2441 if (length == 0) { 2442 return 0; 2443 } 2444 2445 if (offset != file->append_pos) { 2446 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2447 return -EINVAL; 2448 } 2449 2450 pthread_spin_lock(&file->lock); 2451 file->open_for_writing = true; 2452 2453 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2454 cache_append_buffer(file); 2455 } 2456 2457 if (file->last == NULL) { 2458 int rc; 2459 2460 file->append_pos += length; 2461 pthread_spin_unlock(&file->lock); 2462 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2463 sem_wait(&channel->sem); 2464 return rc; 2465 } 2466 2467 blob_size = __file_get_blob_size(file); 2468 2469 if ((offset + length) > blob_size) { 2470 struct spdk_fs_cb_args extend_args = {}; 2471 2472 cluster_sz = file->fs->bs_opts.cluster_sz; 2473 extend_args.sem = &channel->sem; 2474 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2475 extend_args.file = file; 2476 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2477 pthread_spin_unlock(&file->lock); 2478 file->fs->send_request(__file_extend_blob, &extend_args); 2479 sem_wait(&channel->sem); 2480 if (extend_args.rc) { 2481 return extend_args.rc; 2482 } 2483 } 2484 2485 flush_req = alloc_fs_request(channel); 2486 if (flush_req == NULL) { 2487 pthread_spin_unlock(&file->lock); 2488 return -ENOMEM; 2489 } 2490 2491 last = file->last; 2492 rem_length = length; 2493 cur_payload = payload; 2494 while (rem_length > 0) { 2495 copy = last->buf_size - last->bytes_filled; 2496 if (copy > rem_length) { 2497 copy = rem_length; 2498 } 2499 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2500 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2501 file->append_pos += copy; 2502 if (file->length < file->append_pos) { 2503 file->length = file->append_pos; 2504 } 2505 cur_payload += copy; 2506 last->bytes_filled += copy; 2507 rem_length -= copy; 2508 if (last->bytes_filled == last->buf_size) { 2509 cache_buffers_filled++; 2510 last = cache_append_buffer(file); 2511 if (last == NULL) { 2512 BLOBFS_TRACE(file, "nomem\n"); 2513 free_fs_request(flush_req); 2514 pthread_spin_unlock(&file->lock); 2515 return -ENOMEM; 2516 } 2517 } 2518 } 2519 2520 pthread_spin_unlock(&file->lock); 2521 2522 if (cache_buffers_filled == 0) { 2523 free_fs_request(flush_req); 2524 return 0; 2525 } 2526 2527 flush_req->args.file = file; 2528 file->fs->send_request(__file_flush, flush_req); 2529 return 0; 2530 } 2531 2532 static void 2533 __readahead_done(void *ctx, int bserrno) 2534 { 2535 struct spdk_fs_request *req = ctx; 2536 struct spdk_fs_cb_args *args = &req->args; 2537 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2538 struct spdk_file *file = args->file; 2539 2540 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2541 2542 pthread_spin_lock(&file->lock); 2543 cache_buffer->bytes_filled = args->op.readahead.length; 2544 cache_buffer->bytes_flushed = args->op.readahead.length; 2545 cache_buffer->in_progress = false; 2546 pthread_spin_unlock(&file->lock); 2547 2548 free_fs_request(req); 2549 } 2550 2551 static void 2552 __readahead(void *ctx) 2553 { 2554 struct spdk_fs_request *req = ctx; 2555 struct spdk_fs_cb_args *args = &req->args; 2556 struct spdk_file *file = args->file; 2557 uint64_t offset, length, start_lba, num_lba; 2558 uint32_t lba_size; 2559 2560 offset = args->op.readahead.offset; 2561 length = args->op.readahead.length; 2562 assert(length > 0); 2563 2564 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2565 2566 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2567 offset, length, start_lba, num_lba); 2568 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2569 args->op.readahead.cache_buffer->buf, 2570 start_lba, num_lba, __readahead_done, req); 2571 } 2572 2573 static uint64_t 2574 __next_cache_buffer_offset(uint64_t offset) 2575 { 2576 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2577 } 2578 2579 static void 2580 check_readahead(struct spdk_file *file, uint64_t offset, 2581 struct spdk_fs_channel *channel) 2582 { 2583 struct spdk_fs_request *req; 2584 struct spdk_fs_cb_args *args; 2585 2586 offset = __next_cache_buffer_offset(offset); 2587 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2588 return; 2589 } 2590 2591 req = alloc_fs_request(channel); 2592 if (req == NULL) { 2593 return; 2594 } 2595 args = &req->args; 2596 2597 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2598 2599 args->file = file; 2600 args->op.readahead.offset = offset; 2601 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2602 if (!args->op.readahead.cache_buffer) { 2603 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2604 free_fs_request(req); 2605 return; 2606 } 2607 2608 args->op.readahead.cache_buffer->in_progress = true; 2609 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2610 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2611 } else { 2612 args->op.readahead.length = CACHE_BUFFER_SIZE; 2613 } 2614 file->fs->send_request(__readahead, req); 2615 } 2616 2617 int64_t 2618 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2619 void *payload, uint64_t offset, uint64_t length) 2620 { 2621 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2622 uint64_t final_offset, final_length; 2623 uint32_t sub_reads = 0; 2624 struct cache_buffer *buf; 2625 uint64_t read_len; 2626 int rc = 0; 2627 2628 pthread_spin_lock(&file->lock); 2629 2630 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2631 2632 file->open_for_writing = false; 2633 2634 if (length == 0 || offset >= file->append_pos) { 2635 pthread_spin_unlock(&file->lock); 2636 return 0; 2637 } 2638 2639 if (offset + length > file->append_pos) { 2640 length = file->append_pos - offset; 2641 } 2642 2643 if (offset != file->next_seq_offset) { 2644 file->seq_byte_count = 0; 2645 } 2646 file->seq_byte_count += length; 2647 file->next_seq_offset = offset + length; 2648 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2649 check_readahead(file, offset, channel); 2650 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2651 } 2652 2653 final_length = 0; 2654 final_offset = offset + length; 2655 while (offset < final_offset) { 2656 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2657 if (length > (final_offset - offset)) { 2658 length = final_offset - offset; 2659 } 2660 2661 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2662 if (buf == NULL) { 2663 pthread_spin_unlock(&file->lock); 2664 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2665 pthread_spin_lock(&file->lock); 2666 if (rc == 0) { 2667 sub_reads++; 2668 } 2669 } else { 2670 read_len = length; 2671 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2672 read_len = buf->offset + buf->bytes_filled - offset; 2673 } 2674 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2675 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2676 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2677 pthread_spin_lock(&g_caches_lock); 2678 spdk_tree_remove_buffer(file->tree, buf); 2679 if (file->tree->present_mask == 0) { 2680 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2681 } 2682 pthread_spin_unlock(&g_caches_lock); 2683 } 2684 } 2685 2686 if (rc == 0) { 2687 final_length += length; 2688 } else { 2689 break; 2690 } 2691 payload += length; 2692 offset += length; 2693 } 2694 pthread_spin_unlock(&file->lock); 2695 while (sub_reads > 0) { 2696 sem_wait(&channel->sem); 2697 sub_reads--; 2698 } 2699 if (rc == 0) { 2700 return final_length; 2701 } else { 2702 return rc; 2703 } 2704 } 2705 2706 static void 2707 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2708 spdk_file_op_complete cb_fn, void *cb_arg) 2709 { 2710 struct spdk_fs_request *sync_req; 2711 struct spdk_fs_request *flush_req; 2712 struct spdk_fs_cb_args *sync_args; 2713 struct spdk_fs_cb_args *flush_args; 2714 2715 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2716 2717 pthread_spin_lock(&file->lock); 2718 if (file->append_pos <= file->length_xattr) { 2719 BLOBFS_TRACE(file, "done - file already synced\n"); 2720 pthread_spin_unlock(&file->lock); 2721 cb_fn(cb_arg, 0); 2722 return; 2723 } 2724 2725 sync_req = alloc_fs_request(channel); 2726 if (!sync_req) { 2727 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2728 pthread_spin_unlock(&file->lock); 2729 cb_fn(cb_arg, -ENOMEM); 2730 return; 2731 } 2732 sync_args = &sync_req->args; 2733 2734 flush_req = alloc_fs_request(channel); 2735 if (!flush_req) { 2736 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2737 free_fs_request(sync_req); 2738 pthread_spin_unlock(&file->lock); 2739 cb_fn(cb_arg, -ENOMEM); 2740 return; 2741 } 2742 flush_args = &flush_req->args; 2743 2744 sync_args->file = file; 2745 sync_args->fn.file_op = cb_fn; 2746 sync_args->arg = cb_arg; 2747 sync_args->op.sync.offset = file->append_pos; 2748 sync_args->op.sync.xattr_in_progress = false; 2749 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2750 pthread_spin_unlock(&file->lock); 2751 2752 flush_args->file = file; 2753 channel->send_request(__file_flush, flush_req); 2754 } 2755 2756 int 2757 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2758 { 2759 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2760 struct spdk_fs_cb_args args = {}; 2761 2762 args.sem = &channel->sem; 2763 _file_sync(file, channel, __wake_caller, &args); 2764 sem_wait(&channel->sem); 2765 2766 return args.rc; 2767 } 2768 2769 void 2770 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2771 spdk_file_op_complete cb_fn, void *cb_arg) 2772 { 2773 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2774 2775 _file_sync(file, channel, cb_fn, cb_arg); 2776 } 2777 2778 void 2779 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2780 { 2781 BLOBFS_TRACE(file, "priority=%u\n", priority); 2782 file->priority = priority; 2783 2784 } 2785 2786 /* 2787 * Close routines 2788 */ 2789 2790 static void 2791 __file_close_async_done(void *ctx, int bserrno) 2792 { 2793 struct spdk_fs_request *req = ctx; 2794 struct spdk_fs_cb_args *args = &req->args; 2795 struct spdk_file *file = args->file; 2796 2797 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name); 2798 2799 if (file->is_deleted) { 2800 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2801 return; 2802 } 2803 2804 args->fn.file_op(args->arg, bserrno); 2805 free_fs_request(req); 2806 } 2807 2808 static void 2809 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2810 { 2811 struct spdk_blob *blob; 2812 2813 pthread_spin_lock(&file->lock); 2814 if (file->ref_count == 0) { 2815 pthread_spin_unlock(&file->lock); 2816 __file_close_async_done(req, -EBADF); 2817 return; 2818 } 2819 2820 file->ref_count--; 2821 if (file->ref_count > 0) { 2822 pthread_spin_unlock(&file->lock); 2823 req->args.fn.file_op(req->args.arg, 0); 2824 free_fs_request(req); 2825 return; 2826 } 2827 2828 pthread_spin_unlock(&file->lock); 2829 2830 blob = file->blob; 2831 file->blob = NULL; 2832 spdk_blob_close(blob, __file_close_async_done, req); 2833 } 2834 2835 static void 2836 __file_close_async__sync_done(void *arg, int fserrno) 2837 { 2838 struct spdk_fs_request *req = arg; 2839 struct spdk_fs_cb_args *args = &req->args; 2840 2841 __file_close_async(args->file, req); 2842 } 2843 2844 void 2845 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2846 { 2847 struct spdk_fs_request *req; 2848 struct spdk_fs_cb_args *args; 2849 2850 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2851 if (req == NULL) { 2852 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2853 cb_fn(cb_arg, -ENOMEM); 2854 return; 2855 } 2856 2857 args = &req->args; 2858 args->file = file; 2859 args->fn.file_op = cb_fn; 2860 args->arg = cb_arg; 2861 2862 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2863 } 2864 2865 static void 2866 __file_close(void *arg) 2867 { 2868 struct spdk_fs_request *req = arg; 2869 struct spdk_fs_cb_args *args = &req->args; 2870 struct spdk_file *file = args->file; 2871 2872 __file_close_async(file, req); 2873 } 2874 2875 int 2876 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2877 { 2878 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2879 struct spdk_fs_request *req; 2880 struct spdk_fs_cb_args *args; 2881 2882 req = alloc_fs_request(channel); 2883 if (req == NULL) { 2884 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2885 return -ENOMEM; 2886 } 2887 2888 args = &req->args; 2889 2890 spdk_file_sync(file, ctx); 2891 BLOBFS_TRACE(file, "name=%s\n", file->name); 2892 args->file = file; 2893 args->sem = &channel->sem; 2894 args->fn.file_op = __wake_caller; 2895 args->arg = args; 2896 channel->send_request(__file_close, req); 2897 sem_wait(&channel->sem); 2898 2899 return args->rc; 2900 } 2901 2902 int 2903 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2904 { 2905 if (size < sizeof(spdk_blob_id)) { 2906 return -EINVAL; 2907 } 2908 2909 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2910 2911 return sizeof(spdk_blob_id); 2912 } 2913 2914 static void 2915 cache_free_buffers(struct spdk_file *file) 2916 { 2917 BLOBFS_TRACE(file, "free=%s\n", file->name); 2918 pthread_spin_lock(&file->lock); 2919 pthread_spin_lock(&g_caches_lock); 2920 if (file->tree->present_mask == 0) { 2921 pthread_spin_unlock(&g_caches_lock); 2922 pthread_spin_unlock(&file->lock); 2923 return; 2924 } 2925 spdk_tree_free_buffers(file->tree); 2926 2927 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2928 assert(file->tree->present_mask == 0); 2929 file->last = NULL; 2930 pthread_spin_unlock(&g_caches_lock); 2931 pthread_spin_unlock(&file->lock); 2932 } 2933 2934 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2935 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2936