1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk_internal/lvolstore.h" 8 #include "spdk/log.h" 9 #include "spdk/string.h" 10 #include "spdk/thread.h" 11 #include "spdk/blob_bdev.h" 12 #include "spdk/tree.h" 13 #include "spdk/util.h" 14 15 /* Default blob channel opts for lvol */ 16 #define SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS 512 17 18 #define LVOL_NAME "name" 19 20 SPDK_LOG_REGISTER_COMPONENT(lvol) 21 22 struct spdk_lvs_degraded_lvol_set { 23 struct spdk_lvol_store *lvol_store; 24 const void *esnap_id; 25 uint32_t id_len; 26 TAILQ_HEAD(degraded_lvols, spdk_lvol) lvols; 27 RB_ENTRY(spdk_lvs_degraded_lvol_set) node; 28 }; 29 30 static TAILQ_HEAD(, spdk_lvol_store) g_lvol_stores = TAILQ_HEAD_INITIALIZER(g_lvol_stores); 31 static pthread_mutex_t g_lvol_stores_mutex = PTHREAD_MUTEX_INITIALIZER; 32 33 static inline int lvs_opts_copy(const struct spdk_lvs_opts *src, struct spdk_lvs_opts *dst); 34 static int lvs_esnap_bs_dev_create(void *bs_ctx, void *blob_ctx, struct spdk_blob *blob, 35 const void *esnap_id, uint32_t id_len, 36 struct spdk_bs_dev **_bs_dev); 37 static struct spdk_lvol *lvs_get_lvol_by_blob_id(struct spdk_lvol_store *lvs, spdk_blob_id blob_id); 38 static void lvs_degraded_lvol_set_add(struct spdk_lvs_degraded_lvol_set *degraded_set, 39 struct spdk_lvol *lvol); 40 static void lvs_degraded_lvol_set_remove(struct spdk_lvs_degraded_lvol_set *degraded_set, 41 struct spdk_lvol *lvol); 42 43 static int 44 add_lvs_to_list(struct spdk_lvol_store *lvs) 45 { 46 struct spdk_lvol_store *tmp; 47 bool name_conflict = false; 48 49 pthread_mutex_lock(&g_lvol_stores_mutex); 50 TAILQ_FOREACH(tmp, &g_lvol_stores, link) { 51 if (!strncmp(lvs->name, tmp->name, SPDK_LVS_NAME_MAX)) { 52 name_conflict = true; 53 break; 54 } 55 } 56 if (!name_conflict) { 57 lvs->on_list = true; 58 TAILQ_INSERT_TAIL(&g_lvol_stores, lvs, link); 59 } 60 pthread_mutex_unlock(&g_lvol_stores_mutex); 61 62 return name_conflict ? -1 : 0; 63 } 64 65 static struct spdk_lvol_store * 66 lvs_alloc(void) 67 { 68 struct spdk_lvol_store *lvs; 69 70 lvs = calloc(1, sizeof(*lvs)); 71 if (lvs == NULL) { 72 return NULL; 73 } 74 75 TAILQ_INIT(&lvs->lvols); 76 TAILQ_INIT(&lvs->pending_lvols); 77 TAILQ_INIT(&lvs->retry_open_lvols); 78 79 lvs->load_esnaps = false; 80 RB_INIT(&lvs->degraded_lvol_sets_tree); 81 lvs->thread = spdk_get_thread(); 82 83 return lvs; 84 } 85 86 static void 87 lvs_free(struct spdk_lvol_store *lvs) 88 { 89 pthread_mutex_lock(&g_lvol_stores_mutex); 90 if (lvs->on_list) { 91 TAILQ_REMOVE(&g_lvol_stores, lvs, link); 92 } 93 pthread_mutex_unlock(&g_lvol_stores_mutex); 94 95 assert(RB_EMPTY(&lvs->degraded_lvol_sets_tree)); 96 97 free(lvs); 98 } 99 100 static struct spdk_lvol * 101 lvol_alloc(struct spdk_lvol_store *lvs, const char *name, bool thin_provision, 102 enum lvol_clear_method clear_method) 103 { 104 struct spdk_lvol *lvol; 105 106 lvol = calloc(1, sizeof(*lvol)); 107 if (lvol == NULL) { 108 return NULL; 109 } 110 111 lvol->lvol_store = lvs; 112 lvol->clear_method = (enum blob_clear_method)clear_method; 113 snprintf(lvol->name, sizeof(lvol->name), "%s", name); 114 spdk_uuid_generate(&lvol->uuid); 115 spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid); 116 spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->uuid_str), &lvol->uuid); 117 118 TAILQ_INSERT_TAIL(&lvs->pending_lvols, lvol, link); 119 120 return lvol; 121 } 122 123 static void 124 lvol_free(struct spdk_lvol *lvol) 125 { 126 free(lvol); 127 } 128 129 static void 130 lvol_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno) 131 { 132 struct spdk_lvol_with_handle_req *req = cb_arg; 133 struct spdk_lvol *lvol = req->lvol; 134 135 if (lvolerrno != 0) { 136 SPDK_INFOLOG(lvol, "Failed to open lvol %s\n", lvol->unique_id); 137 goto end; 138 } 139 140 lvol->ref_count++; 141 lvol->blob = blob; 142 end: 143 req->cb_fn(req->cb_arg, lvol, lvolerrno); 144 free(req); 145 } 146 147 void 148 spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg) 149 { 150 struct spdk_lvol_with_handle_req *req; 151 struct spdk_blob_open_opts opts; 152 153 assert(cb_fn != NULL); 154 155 if (lvol == NULL) { 156 SPDK_ERRLOG("lvol does not exist\n"); 157 cb_fn(cb_arg, NULL, -ENODEV); 158 return; 159 } 160 161 if (lvol->action_in_progress == true) { 162 SPDK_ERRLOG("Cannot open lvol - operations on lvol pending\n"); 163 cb_fn(cb_arg, lvol, -EBUSY); 164 return; 165 } 166 167 if (lvol->ref_count > 0) { 168 lvol->ref_count++; 169 cb_fn(cb_arg, lvol, 0); 170 return; 171 } 172 173 req = calloc(1, sizeof(*req)); 174 if (req == NULL) { 175 SPDK_ERRLOG("Cannot alloc memory for request structure\n"); 176 cb_fn(cb_arg, NULL, -ENOMEM); 177 return; 178 } 179 180 req->cb_fn = cb_fn; 181 req->cb_arg = cb_arg; 182 req->lvol = lvol; 183 184 spdk_blob_open_opts_init(&opts, sizeof(opts)); 185 opts.clear_method = lvol->clear_method; 186 187 spdk_bs_open_blob_ext(lvol->lvol_store->blobstore, lvol->blob_id, &opts, lvol_open_cb, req); 188 } 189 190 static void 191 bs_unload_with_error_cb(void *cb_arg, int lvolerrno) 192 { 193 struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg; 194 195 req->cb_fn(req->cb_arg, NULL, req->lvserrno); 196 free(req); 197 } 198 199 static void 200 load_next_lvol(void *cb_arg, struct spdk_blob *blob, int lvolerrno) 201 { 202 struct spdk_lvs_with_handle_req *req = cb_arg; 203 struct spdk_lvol_store *lvs = req->lvol_store; 204 struct spdk_blob_store *bs = lvs->blobstore; 205 struct spdk_lvol *lvol, *tmp; 206 spdk_blob_id blob_id; 207 const char *attr; 208 size_t value_len; 209 int rc; 210 211 if (lvolerrno == -ENOENT) { 212 /* Finished iterating */ 213 if (req->lvserrno == 0) { 214 lvs->load_esnaps = true; 215 req->cb_fn(req->cb_arg, lvs, req->lvserrno); 216 free(req); 217 } else { 218 TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) { 219 TAILQ_REMOVE(&lvs->lvols, lvol, link); 220 free(lvol); 221 } 222 lvs_free(lvs); 223 spdk_bs_unload(bs, bs_unload_with_error_cb, req); 224 } 225 return; 226 } else if (lvolerrno < 0) { 227 SPDK_ERRLOG("Failed to fetch blobs list\n"); 228 req->lvserrno = lvolerrno; 229 goto invalid; 230 } 231 232 blob_id = spdk_blob_get_id(blob); 233 234 if (blob_id == lvs->super_blob_id) { 235 SPDK_INFOLOG(lvol, "found superblob %"PRIu64"\n", (uint64_t)blob_id); 236 spdk_bs_iter_next(bs, blob, load_next_lvol, req); 237 return; 238 } 239 240 lvol = calloc(1, sizeof(*lvol)); 241 if (!lvol) { 242 SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n"); 243 req->lvserrno = -ENOMEM; 244 goto invalid; 245 } 246 247 /* 248 * Do not store a reference to blob now because spdk_bs_iter_next() will close it. 249 * Storing blob_id for future lookups is fine. 250 */ 251 lvol->blob_id = blob_id; 252 lvol->lvol_store = lvs; 253 254 rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len); 255 if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0' || 256 spdk_uuid_parse(&lvol->uuid, attr) != 0) { 257 SPDK_INFOLOG(lvol, "Missing or corrupt lvol uuid\n"); 258 memset(&lvol->uuid, 0, sizeof(lvol->uuid)); 259 } 260 spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid); 261 262 if (!spdk_mem_all_zero(&lvol->uuid, sizeof(lvol->uuid))) { 263 snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str); 264 } else { 265 spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->unique_id), &lvol->lvol_store->uuid); 266 value_len = strlen(lvol->unique_id); 267 snprintf(lvol->unique_id + value_len, sizeof(lvol->unique_id) - value_len, "_%"PRIu64, 268 (uint64_t)blob_id); 269 } 270 271 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len); 272 if (rc != 0 || value_len > SPDK_LVOL_NAME_MAX) { 273 SPDK_ERRLOG("Cannot assign lvol name\n"); 274 lvol_free(lvol); 275 req->lvserrno = -EINVAL; 276 goto invalid; 277 } 278 279 snprintf(lvol->name, sizeof(lvol->name), "%s", attr); 280 281 TAILQ_INSERT_TAIL(&lvs->lvols, lvol, link); 282 283 lvs->lvol_count++; 284 285 SPDK_INFOLOG(lvol, "added lvol %s (%s)\n", lvol->unique_id, lvol->uuid_str); 286 287 invalid: 288 spdk_bs_iter_next(bs, blob, load_next_lvol, req); 289 } 290 291 static void 292 close_super_cb(void *cb_arg, int lvolerrno) 293 { 294 struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg; 295 struct spdk_lvol_store *lvs = req->lvol_store; 296 struct spdk_blob_store *bs = lvs->blobstore; 297 298 if (lvolerrno != 0) { 299 SPDK_INFOLOG(lvol, "Could not close super blob\n"); 300 lvs_free(lvs); 301 req->lvserrno = -ENODEV; 302 spdk_bs_unload(bs, bs_unload_with_error_cb, req); 303 return; 304 } 305 306 /* Start loading lvols */ 307 spdk_bs_iter_first(lvs->blobstore, load_next_lvol, req); 308 } 309 310 static void 311 close_super_blob_with_error_cb(void *cb_arg, int lvolerrno) 312 { 313 struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg; 314 struct spdk_lvol_store *lvs = req->lvol_store; 315 struct spdk_blob_store *bs = lvs->blobstore; 316 317 lvs_free(lvs); 318 319 spdk_bs_unload(bs, bs_unload_with_error_cb, req); 320 } 321 322 static void 323 lvs_read_uuid(void *cb_arg, struct spdk_blob *blob, int lvolerrno) 324 { 325 struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg; 326 struct spdk_lvol_store *lvs = req->lvol_store; 327 struct spdk_blob_store *bs = lvs->blobstore; 328 const char *attr; 329 size_t value_len; 330 int rc; 331 332 if (lvolerrno != 0) { 333 SPDK_INFOLOG(lvol, "Could not open super blob\n"); 334 lvs_free(lvs); 335 req->lvserrno = -ENODEV; 336 spdk_bs_unload(bs, bs_unload_with_error_cb, req); 337 return; 338 } 339 340 rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len); 341 if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0') { 342 SPDK_INFOLOG(lvol, "degraded_set or incorrect UUID\n"); 343 req->lvserrno = -EINVAL; 344 spdk_blob_close(blob, close_super_blob_with_error_cb, req); 345 return; 346 } 347 348 if (spdk_uuid_parse(&lvs->uuid, attr)) { 349 SPDK_INFOLOG(lvol, "incorrect UUID '%s'\n", attr); 350 req->lvserrno = -EINVAL; 351 spdk_blob_close(blob, close_super_blob_with_error_cb, req); 352 return; 353 } 354 355 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len); 356 if (rc != 0 || value_len > SPDK_LVS_NAME_MAX) { 357 SPDK_INFOLOG(lvol, "degraded_set or invalid name\n"); 358 req->lvserrno = -EINVAL; 359 spdk_blob_close(blob, close_super_blob_with_error_cb, req); 360 return; 361 } 362 363 snprintf(lvs->name, sizeof(lvs->name), "%s", attr); 364 365 rc = add_lvs_to_list(lvs); 366 if (rc) { 367 SPDK_INFOLOG(lvol, "lvolstore with name %s already exists\n", lvs->name); 368 req->lvserrno = -EEXIST; 369 spdk_blob_close(blob, close_super_blob_with_error_cb, req); 370 return; 371 } 372 373 lvs->super_blob_id = spdk_blob_get_id(blob); 374 375 spdk_blob_close(blob, close_super_cb, req); 376 } 377 378 static void 379 lvs_open_super(void *cb_arg, spdk_blob_id blobid, int lvolerrno) 380 { 381 struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg; 382 struct spdk_lvol_store *lvs = req->lvol_store; 383 struct spdk_blob_store *bs = lvs->blobstore; 384 385 if (lvolerrno != 0) { 386 SPDK_INFOLOG(lvol, "Super blob not found\n"); 387 lvs_free(lvs); 388 req->lvserrno = -ENODEV; 389 spdk_bs_unload(bs, bs_unload_with_error_cb, req); 390 return; 391 } 392 393 spdk_bs_open_blob(bs, blobid, lvs_read_uuid, req); 394 } 395 396 static void 397 lvs_load_cb(void *cb_arg, struct spdk_blob_store *bs, int lvolerrno) 398 { 399 struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg; 400 struct spdk_lvol_store *lvs = req->lvol_store; 401 402 if (lvolerrno != 0) { 403 req->cb_fn(req->cb_arg, NULL, lvolerrno); 404 lvs_free(lvs); 405 free(req); 406 return; 407 } 408 409 lvs->blobstore = bs; 410 lvs->bs_dev = req->bs_dev; 411 412 spdk_bs_get_super(bs, lvs_open_super, req); 413 } 414 415 static void 416 lvs_bs_opts_init(struct spdk_bs_opts *opts) 417 { 418 spdk_bs_opts_init(opts, sizeof(*opts)); 419 opts->max_channel_ops = SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS; 420 } 421 422 static void 423 lvs_load(struct spdk_bs_dev *bs_dev, const struct spdk_lvs_opts *_lvs_opts, 424 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg) 425 { 426 struct spdk_lvs_with_handle_req *req; 427 struct spdk_bs_opts bs_opts = {}; 428 struct spdk_lvs_opts lvs_opts; 429 430 assert(cb_fn != NULL); 431 432 if (bs_dev == NULL) { 433 SPDK_ERRLOG("Blobstore device does not exist\n"); 434 cb_fn(cb_arg, NULL, -ENODEV); 435 return; 436 } 437 438 spdk_lvs_opts_init(&lvs_opts); 439 if (_lvs_opts != NULL) { 440 if (lvs_opts_copy(_lvs_opts, &lvs_opts) != 0) { 441 SPDK_ERRLOG("Invalid options\n"); 442 cb_fn(cb_arg, NULL, -EINVAL); 443 return; 444 } 445 } 446 447 req = calloc(1, sizeof(*req)); 448 if (req == NULL) { 449 SPDK_ERRLOG("Cannot alloc memory for request structure\n"); 450 cb_fn(cb_arg, NULL, -ENOMEM); 451 return; 452 } 453 454 req->lvol_store = lvs_alloc(); 455 if (req->lvol_store == NULL) { 456 SPDK_ERRLOG("Cannot alloc memory for lvol store\n"); 457 free(req); 458 cb_fn(cb_arg, NULL, -ENOMEM); 459 return; 460 } 461 req->cb_fn = cb_fn; 462 req->cb_arg = cb_arg; 463 req->bs_dev = bs_dev; 464 465 lvs_bs_opts_init(&bs_opts); 466 snprintf(bs_opts.bstype.bstype, sizeof(bs_opts.bstype.bstype), "LVOLSTORE"); 467 468 if (lvs_opts.esnap_bs_dev_create != NULL) { 469 req->lvol_store->esnap_bs_dev_create = lvs_opts.esnap_bs_dev_create; 470 bs_opts.esnap_bs_dev_create = lvs_esnap_bs_dev_create; 471 bs_opts.esnap_ctx = req->lvol_store; 472 } 473 474 spdk_bs_load(bs_dev, &bs_opts, lvs_load_cb, req); 475 } 476 477 void 478 spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg) 479 { 480 lvs_load(bs_dev, NULL, cb_fn, cb_arg); 481 } 482 483 void 484 spdk_lvs_load_ext(struct spdk_bs_dev *bs_dev, const struct spdk_lvs_opts *opts, 485 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg) 486 { 487 lvs_load(bs_dev, opts, cb_fn, cb_arg); 488 } 489 490 static void 491 remove_bs_on_error_cb(void *cb_arg, int bserrno) 492 { 493 } 494 495 static void 496 exit_error_lvs_req(struct spdk_lvs_with_handle_req *req, struct spdk_lvol_store *lvs, int lvolerrno) 497 { 498 req->cb_fn(req->cb_arg, NULL, lvolerrno); 499 spdk_bs_destroy(lvs->blobstore, remove_bs_on_error_cb, NULL); 500 lvs_free(lvs); 501 free(req); 502 } 503 504 static void 505 super_create_close_cb(void *cb_arg, int lvolerrno) 506 { 507 struct spdk_lvs_with_handle_req *req = cb_arg; 508 struct spdk_lvol_store *lvs = req->lvol_store; 509 510 if (lvolerrno < 0) { 511 SPDK_ERRLOG("Lvol store init failed: could not close super blob\n"); 512 exit_error_lvs_req(req, lvs, lvolerrno); 513 return; 514 } 515 516 req->cb_fn(req->cb_arg, lvs, lvolerrno); 517 free(req); 518 } 519 520 static void 521 super_blob_set_cb(void *cb_arg, int lvolerrno) 522 { 523 struct spdk_lvs_with_handle_req *req = cb_arg; 524 struct spdk_lvol_store *lvs = req->lvol_store; 525 struct spdk_blob *blob = lvs->super_blob; 526 527 if (lvolerrno < 0) { 528 SPDK_ERRLOG("Lvol store init failed: could not set uuid for super blob\n"); 529 exit_error_lvs_req(req, lvs, lvolerrno); 530 return; 531 } 532 533 spdk_blob_close(blob, super_create_close_cb, req); 534 } 535 536 static void 537 super_blob_init_cb(void *cb_arg, int lvolerrno) 538 { 539 struct spdk_lvs_with_handle_req *req = cb_arg; 540 struct spdk_lvol_store *lvs = req->lvol_store; 541 struct spdk_blob *blob = lvs->super_blob; 542 char uuid[SPDK_UUID_STRING_LEN]; 543 544 if (lvolerrno < 0) { 545 SPDK_ERRLOG("Lvol store init failed: could not set super blob\n"); 546 exit_error_lvs_req(req, lvs, lvolerrno); 547 return; 548 } 549 550 spdk_uuid_fmt_lower(uuid, sizeof(uuid), &lvs->uuid); 551 552 spdk_blob_set_xattr(blob, "uuid", uuid, sizeof(uuid)); 553 spdk_blob_set_xattr(blob, "name", lvs->name, strnlen(lvs->name, SPDK_LVS_NAME_MAX) + 1); 554 spdk_blob_sync_md(blob, super_blob_set_cb, req); 555 } 556 557 static void 558 super_blob_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno) 559 { 560 struct spdk_lvs_with_handle_req *req = cb_arg; 561 struct spdk_lvol_store *lvs = req->lvol_store; 562 563 if (lvolerrno < 0) { 564 SPDK_ERRLOG("Lvol store init failed: could not open super blob\n"); 565 exit_error_lvs_req(req, lvs, lvolerrno); 566 return; 567 } 568 569 lvs->super_blob = blob; 570 lvs->super_blob_id = spdk_blob_get_id(blob); 571 572 spdk_bs_set_super(lvs->blobstore, lvs->super_blob_id, super_blob_init_cb, req); 573 } 574 575 static void 576 super_blob_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno) 577 { 578 struct spdk_lvs_with_handle_req *req = cb_arg; 579 struct spdk_lvol_store *lvs = req->lvol_store; 580 struct spdk_blob_store *bs; 581 582 if (lvolerrno < 0) { 583 SPDK_ERRLOG("Lvol store init failed: could not create super blob\n"); 584 exit_error_lvs_req(req, lvs, lvolerrno); 585 return; 586 } 587 588 bs = req->lvol_store->blobstore; 589 590 spdk_bs_open_blob(bs, blobid, super_blob_create_open_cb, req); 591 } 592 593 static void 594 lvs_init_cb(void *cb_arg, struct spdk_blob_store *bs, int lvserrno) 595 { 596 struct spdk_lvs_with_handle_req *lvs_req = cb_arg; 597 struct spdk_lvol_store *lvs = lvs_req->lvol_store; 598 599 if (lvserrno != 0) { 600 assert(bs == NULL); 601 lvs_req->cb_fn(lvs_req->cb_arg, NULL, lvserrno); 602 SPDK_ERRLOG("Lvol store init failed: could not initialize blobstore\n"); 603 lvs_free(lvs); 604 free(lvs_req); 605 return; 606 } 607 608 assert(bs != NULL); 609 lvs->blobstore = bs; 610 611 SPDK_INFOLOG(lvol, "Lvol store initialized\n"); 612 613 /* create super blob */ 614 spdk_bs_create_blob(lvs->blobstore, super_blob_create_cb, lvs_req); 615 } 616 617 void 618 spdk_lvs_opts_init(struct spdk_lvs_opts *o) 619 { 620 memset(o, 0, sizeof(*o)); 621 o->cluster_sz = SPDK_LVS_OPTS_CLUSTER_SZ; 622 o->clear_method = LVS_CLEAR_WITH_UNMAP; 623 o->num_md_pages_per_cluster_ratio = 100; 624 o->opts_size = sizeof(*o); 625 } 626 627 static inline int 628 lvs_opts_copy(const struct spdk_lvs_opts *src, struct spdk_lvs_opts *dst) 629 { 630 if (src->opts_size == 0) { 631 SPDK_ERRLOG("opts_size should not be zero value\n"); 632 return -1; 633 } 634 #define FIELD_OK(field) \ 635 offsetof(struct spdk_lvs_opts, field) + sizeof(src->field) <= src->opts_size 636 637 #define SET_FIELD(field) \ 638 if (FIELD_OK(field)) { \ 639 dst->field = src->field; \ 640 } \ 641 642 SET_FIELD(cluster_sz); 643 SET_FIELD(clear_method); 644 if (FIELD_OK(name)) { 645 memcpy(&dst->name, &src->name, sizeof(dst->name)); 646 } 647 SET_FIELD(num_md_pages_per_cluster_ratio); 648 SET_FIELD(opts_size); 649 SET_FIELD(esnap_bs_dev_create); 650 651 dst->opts_size = src->opts_size; 652 653 /* You should not remove this statement, but need to update the assert statement 654 * if you add a new field, and also add a corresponding SET_FIELD statement */ 655 SPDK_STATIC_ASSERT(sizeof(struct spdk_lvs_opts) == 88, "Incorrect size"); 656 657 #undef FIELD_OK 658 #undef SET_FIELD 659 660 return 0; 661 } 662 663 static void 664 setup_lvs_opts(struct spdk_bs_opts *bs_opts, struct spdk_lvs_opts *o, uint32_t total_clusters, 665 void *esnap_ctx) 666 { 667 assert(o != NULL); 668 lvs_bs_opts_init(bs_opts); 669 bs_opts->cluster_sz = o->cluster_sz; 670 bs_opts->clear_method = (enum bs_clear_method)o->clear_method; 671 bs_opts->num_md_pages = (o->num_md_pages_per_cluster_ratio * total_clusters) / 100; 672 bs_opts->esnap_bs_dev_create = o->esnap_bs_dev_create; 673 bs_opts->esnap_ctx = esnap_ctx; 674 snprintf(bs_opts->bstype.bstype, sizeof(bs_opts->bstype.bstype), "LVOLSTORE"); 675 } 676 677 int 678 spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o, 679 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg) 680 { 681 struct spdk_lvol_store *lvs; 682 struct spdk_lvs_with_handle_req *lvs_req; 683 struct spdk_bs_opts opts = {}; 684 struct spdk_lvs_opts lvs_opts; 685 uint32_t total_clusters; 686 int rc; 687 688 if (bs_dev == NULL) { 689 SPDK_ERRLOG("Blobstore device does not exist\n"); 690 return -ENODEV; 691 } 692 693 if (o == NULL) { 694 SPDK_ERRLOG("spdk_lvs_opts not specified\n"); 695 return -EINVAL; 696 } 697 698 spdk_lvs_opts_init(&lvs_opts); 699 if (lvs_opts_copy(o, &lvs_opts) != 0) { 700 SPDK_ERRLOG("spdk_lvs_opts invalid\n"); 701 return -EINVAL; 702 } 703 704 if (lvs_opts.cluster_sz < bs_dev->blocklen) { 705 SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than blocklen %" PRIu32 "\n", 706 lvs_opts.cluster_sz, bs_dev->blocklen); 707 return -EINVAL; 708 } 709 total_clusters = bs_dev->blockcnt / (lvs_opts.cluster_sz / bs_dev->blocklen); 710 711 lvs = lvs_alloc(); 712 if (!lvs) { 713 SPDK_ERRLOG("Cannot alloc memory for lvol store base pointer\n"); 714 return -ENOMEM; 715 } 716 717 setup_lvs_opts(&opts, o, total_clusters, lvs); 718 719 if (strnlen(lvs_opts.name, SPDK_LVS_NAME_MAX) == SPDK_LVS_NAME_MAX) { 720 SPDK_ERRLOG("Name has no null terminator.\n"); 721 lvs_free(lvs); 722 return -EINVAL; 723 } 724 725 if (strnlen(lvs_opts.name, SPDK_LVS_NAME_MAX) == 0) { 726 SPDK_ERRLOG("No name specified.\n"); 727 lvs_free(lvs); 728 return -EINVAL; 729 } 730 731 spdk_uuid_generate(&lvs->uuid); 732 snprintf(lvs->name, sizeof(lvs->name), "%s", lvs_opts.name); 733 734 rc = add_lvs_to_list(lvs); 735 if (rc) { 736 SPDK_ERRLOG("lvolstore with name %s already exists\n", lvs->name); 737 lvs_free(lvs); 738 return -EEXIST; 739 } 740 741 lvs_req = calloc(1, sizeof(*lvs_req)); 742 if (!lvs_req) { 743 lvs_free(lvs); 744 SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n"); 745 return -ENOMEM; 746 } 747 748 assert(cb_fn != NULL); 749 lvs_req->cb_fn = cb_fn; 750 lvs_req->cb_arg = cb_arg; 751 lvs_req->lvol_store = lvs; 752 lvs->bs_dev = bs_dev; 753 754 SPDK_INFOLOG(lvol, "Initializing lvol store\n"); 755 spdk_bs_init(bs_dev, &opts, lvs_init_cb, lvs_req); 756 757 return 0; 758 } 759 760 static void 761 lvs_rename_cb(void *cb_arg, int lvolerrno) 762 { 763 struct spdk_lvs_req *req = cb_arg; 764 765 if (lvolerrno != 0) { 766 req->lvserrno = lvolerrno; 767 } 768 if (req->lvserrno != 0) { 769 SPDK_ERRLOG("Lvol store rename operation failed\n"); 770 /* Lvs renaming failed, so we should 'clear' new_name. 771 * Otherwise it could cause a failure on the next attempt to change the name to 'new_name' */ 772 snprintf(req->lvol_store->new_name, 773 sizeof(req->lvol_store->new_name), 774 "%s", req->lvol_store->name); 775 } else { 776 /* Update lvs name with new_name */ 777 snprintf(req->lvol_store->name, 778 sizeof(req->lvol_store->name), 779 "%s", req->lvol_store->new_name); 780 } 781 782 req->cb_fn(req->cb_arg, req->lvserrno); 783 free(req); 784 } 785 786 static void 787 lvs_rename_sync_cb(void *cb_arg, int lvolerrno) 788 { 789 struct spdk_lvs_req *req = cb_arg; 790 struct spdk_blob *blob = req->lvol_store->super_blob; 791 792 if (lvolerrno < 0) { 793 req->lvserrno = lvolerrno; 794 } 795 796 spdk_blob_close(blob, lvs_rename_cb, req); 797 } 798 799 static void 800 lvs_rename_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno) 801 { 802 struct spdk_lvs_req *req = cb_arg; 803 int rc; 804 805 if (lvolerrno < 0) { 806 lvs_rename_cb(cb_arg, lvolerrno); 807 return; 808 } 809 810 rc = spdk_blob_set_xattr(blob, "name", req->lvol_store->new_name, 811 strlen(req->lvol_store->new_name) + 1); 812 if (rc < 0) { 813 req->lvserrno = rc; 814 lvs_rename_sync_cb(req, rc); 815 return; 816 } 817 818 req->lvol_store->super_blob = blob; 819 820 spdk_blob_sync_md(blob, lvs_rename_sync_cb, req); 821 } 822 823 void 824 spdk_lvs_rename(struct spdk_lvol_store *lvs, const char *new_name, 825 spdk_lvs_op_complete cb_fn, void *cb_arg) 826 { 827 struct spdk_lvs_req *req; 828 struct spdk_lvol_store *tmp; 829 830 /* Check if new name is current lvs name. 831 * If so, return success immediately */ 832 if (strncmp(lvs->name, new_name, SPDK_LVS_NAME_MAX) == 0) { 833 cb_fn(cb_arg, 0); 834 return; 835 } 836 837 /* Check if new or new_name is already used in other lvs */ 838 pthread_mutex_lock(&g_lvol_stores_mutex); 839 TAILQ_FOREACH(tmp, &g_lvol_stores, link) { 840 if (!strncmp(new_name, tmp->name, SPDK_LVS_NAME_MAX) || 841 !strncmp(new_name, tmp->new_name, SPDK_LVS_NAME_MAX)) { 842 pthread_mutex_unlock(&g_lvol_stores_mutex); 843 cb_fn(cb_arg, -EEXIST); 844 return; 845 } 846 } 847 pthread_mutex_unlock(&g_lvol_stores_mutex); 848 849 req = calloc(1, sizeof(*req)); 850 if (!req) { 851 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 852 cb_fn(cb_arg, -ENOMEM); 853 return; 854 } 855 snprintf(lvs->new_name, sizeof(lvs->new_name), "%s", new_name); 856 req->lvol_store = lvs; 857 req->cb_fn = cb_fn; 858 req->cb_arg = cb_arg; 859 860 spdk_bs_open_blob(lvs->blobstore, lvs->super_blob_id, lvs_rename_open_cb, req); 861 } 862 863 static void 864 _lvs_unload_cb(void *cb_arg, int lvserrno) 865 { 866 struct spdk_lvs_req *lvs_req = cb_arg; 867 868 SPDK_INFOLOG(lvol, "Lvol store unloaded\n"); 869 assert(lvs_req->cb_fn != NULL); 870 lvs_req->cb_fn(lvs_req->cb_arg, lvserrno); 871 free(lvs_req); 872 } 873 874 int 875 spdk_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, 876 void *cb_arg) 877 { 878 struct spdk_lvs_req *lvs_req; 879 struct spdk_lvol *lvol, *tmp; 880 881 if (lvs == NULL) { 882 SPDK_ERRLOG("Lvol store is NULL\n"); 883 return -ENODEV; 884 } 885 886 TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) { 887 if (lvol->action_in_progress == true) { 888 SPDK_ERRLOG("Cannot unload lvol store - operations on lvols pending\n"); 889 cb_fn(cb_arg, -EBUSY); 890 return -EBUSY; 891 } else if (lvol->ref_count != 0) { 892 SPDK_ERRLOG("Lvols still open on lvol store\n"); 893 cb_fn(cb_arg, -EBUSY); 894 return -EBUSY; 895 } 896 } 897 898 TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) { 899 spdk_lvs_esnap_missing_remove(lvol); 900 TAILQ_REMOVE(&lvs->lvols, lvol, link); 901 lvol_free(lvol); 902 } 903 904 lvs_req = calloc(1, sizeof(*lvs_req)); 905 if (!lvs_req) { 906 SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n"); 907 return -ENOMEM; 908 } 909 910 lvs_req->cb_fn = cb_fn; 911 lvs_req->cb_arg = cb_arg; 912 913 SPDK_INFOLOG(lvol, "Unloading lvol store\n"); 914 spdk_bs_unload(lvs->blobstore, _lvs_unload_cb, lvs_req); 915 lvs_free(lvs); 916 917 return 0; 918 } 919 920 static void 921 _lvs_destroy_cb(void *cb_arg, int lvserrno) 922 { 923 struct spdk_lvs_destroy_req *lvs_req = cb_arg; 924 925 SPDK_INFOLOG(lvol, "Lvol store destroyed\n"); 926 assert(lvs_req->cb_fn != NULL); 927 lvs_req->cb_fn(lvs_req->cb_arg, lvserrno); 928 free(lvs_req); 929 } 930 931 static void 932 _lvs_destroy_super_cb(void *cb_arg, int bserrno) 933 { 934 struct spdk_lvs_destroy_req *lvs_req = cb_arg; 935 struct spdk_lvol_store *lvs = lvs_req->lvs; 936 937 assert(lvs != NULL); 938 939 SPDK_INFOLOG(lvol, "Destroying lvol store\n"); 940 spdk_bs_destroy(lvs->blobstore, _lvs_destroy_cb, lvs_req); 941 lvs_free(lvs); 942 } 943 944 int 945 spdk_lvs_destroy(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, 946 void *cb_arg) 947 { 948 struct spdk_lvs_destroy_req *lvs_req; 949 struct spdk_lvol *iter_lvol, *tmp; 950 951 if (lvs == NULL) { 952 SPDK_ERRLOG("Lvol store is NULL\n"); 953 return -ENODEV; 954 } 955 956 TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) { 957 if (iter_lvol->action_in_progress == true) { 958 SPDK_ERRLOG("Cannot destroy lvol store - operations on lvols pending\n"); 959 cb_fn(cb_arg, -EBUSY); 960 return -EBUSY; 961 } else if (iter_lvol->ref_count != 0) { 962 SPDK_ERRLOG("Lvols still open on lvol store\n"); 963 cb_fn(cb_arg, -EBUSY); 964 return -EBUSY; 965 } 966 } 967 968 TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) { 969 free(iter_lvol); 970 } 971 972 lvs_req = calloc(1, sizeof(*lvs_req)); 973 if (!lvs_req) { 974 SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n"); 975 return -ENOMEM; 976 } 977 978 lvs_req->cb_fn = cb_fn; 979 lvs_req->cb_arg = cb_arg; 980 lvs_req->lvs = lvs; 981 982 SPDK_INFOLOG(lvol, "Deleting super blob\n"); 983 spdk_bs_delete_blob(lvs->blobstore, lvs->super_blob_id, _lvs_destroy_super_cb, lvs_req); 984 985 return 0; 986 } 987 988 static void 989 lvol_close_blob_cb(void *cb_arg, int lvolerrno) 990 { 991 struct spdk_lvol_req *req = cb_arg; 992 struct spdk_lvol *lvol = req->lvol; 993 994 if (lvolerrno < 0) { 995 SPDK_ERRLOG("Could not close blob on lvol\n"); 996 lvol_free(lvol); 997 goto end; 998 } 999 1000 lvol->ref_count--; 1001 lvol->action_in_progress = false; 1002 lvol->blob = NULL; 1003 SPDK_INFOLOG(lvol, "Lvol %s closed\n", lvol->unique_id); 1004 1005 end: 1006 req->cb_fn(req->cb_arg, lvolerrno); 1007 free(req); 1008 } 1009 1010 bool 1011 spdk_lvol_deletable(struct spdk_lvol *lvol) 1012 { 1013 size_t count = 0; 1014 1015 spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count); 1016 return (count == 0); 1017 } 1018 1019 static void 1020 lvol_delete_blob_cb(void *cb_arg, int lvolerrno) 1021 { 1022 struct spdk_lvol_req *req = cb_arg; 1023 struct spdk_lvol *lvol = req->lvol; 1024 struct spdk_lvol *clone_lvol = req->clone_lvol; 1025 1026 if (lvolerrno < 0) { 1027 SPDK_ERRLOG("Could not remove blob on lvol gracefully - forced removal\n"); 1028 } else { 1029 SPDK_INFOLOG(lvol, "Lvol %s deleted\n", lvol->unique_id); 1030 } 1031 1032 if (lvol->degraded_set != NULL) { 1033 if (clone_lvol != NULL) { 1034 /* 1035 * A degraded esnap clone that has a blob clone has been deleted. clone_lvol 1036 * becomes an esnap clone and needs to be associated with the 1037 * spdk_lvs_degraded_lvol_set. 1038 */ 1039 struct spdk_lvs_degraded_lvol_set *degraded_set = lvol->degraded_set; 1040 1041 lvs_degraded_lvol_set_remove(degraded_set, lvol); 1042 lvs_degraded_lvol_set_add(degraded_set, clone_lvol); 1043 } else { 1044 spdk_lvs_esnap_missing_remove(lvol); 1045 } 1046 } 1047 1048 TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link); 1049 lvol_free(lvol); 1050 req->cb_fn(req->cb_arg, lvolerrno); 1051 free(req); 1052 } 1053 1054 static void 1055 lvol_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno) 1056 { 1057 struct spdk_lvol_with_handle_req *req = cb_arg; 1058 struct spdk_lvol *lvol = req->lvol; 1059 1060 TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link); 1061 1062 if (lvolerrno < 0) { 1063 free(lvol); 1064 req->cb_fn(req->cb_arg, NULL, lvolerrno); 1065 free(req); 1066 return; 1067 } 1068 1069 lvol->blob = blob; 1070 lvol->blob_id = spdk_blob_get_id(blob); 1071 1072 TAILQ_INSERT_TAIL(&lvol->lvol_store->lvols, lvol, link); 1073 1074 lvol->ref_count++; 1075 1076 assert(req->cb_fn != NULL); 1077 req->cb_fn(req->cb_arg, req->lvol, lvolerrno); 1078 free(req); 1079 } 1080 1081 static void 1082 lvol_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno) 1083 { 1084 struct spdk_lvol_with_handle_req *req = cb_arg; 1085 struct spdk_blob_store *bs; 1086 struct spdk_blob_open_opts opts; 1087 1088 if (lvolerrno < 0) { 1089 TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link); 1090 free(req->lvol); 1091 assert(req->cb_fn != NULL); 1092 req->cb_fn(req->cb_arg, NULL, lvolerrno); 1093 free(req); 1094 return; 1095 } 1096 1097 spdk_blob_open_opts_init(&opts, sizeof(opts)); 1098 opts.clear_method = req->lvol->clear_method; 1099 /* 1100 * If the lvol that is being created is an esnap clone, the blobstore needs to be able to 1101 * pass the lvol to the esnap_bs_dev_create callback. In order for that to happen, we need 1102 * to pass it here. 1103 * 1104 * This does set ensap_ctx in cases where it's not needed, but we don't know that it's not 1105 * needed until after the blob is open. When the blob is not an esnap clone, a reference to 1106 * the value stored in opts.esnap_ctx is not retained by the blobstore. 1107 */ 1108 opts.esnap_ctx = req->lvol; 1109 bs = req->lvol->lvol_store->blobstore; 1110 1111 if (req->origlvol != NULL && req->origlvol->degraded_set != NULL) { 1112 /* 1113 * A snapshot was created from a degraded esnap clone. The new snapshot is now a 1114 * degraded esnap clone. The previous clone is now a regular clone of a blob. Update 1115 * the set of directly-related clones to the missing external snapshot. 1116 */ 1117 struct spdk_lvs_degraded_lvol_set *degraded_set = req->origlvol->degraded_set; 1118 1119 lvs_degraded_lvol_set_remove(degraded_set, req->origlvol); 1120 lvs_degraded_lvol_set_add(degraded_set, req->lvol); 1121 } 1122 1123 spdk_bs_open_blob_ext(bs, blobid, &opts, lvol_create_open_cb, req); 1124 } 1125 1126 static void 1127 lvol_get_xattr_value(void *xattr_ctx, const char *name, 1128 const void **value, size_t *value_len) 1129 { 1130 struct spdk_lvol *lvol = xattr_ctx; 1131 1132 if (!strcmp(LVOL_NAME, name)) { 1133 *value = lvol->name; 1134 *value_len = SPDK_LVOL_NAME_MAX; 1135 return; 1136 } 1137 if (!strcmp("uuid", name)) { 1138 *value = lvol->uuid_str; 1139 *value_len = sizeof(lvol->uuid_str); 1140 return; 1141 } 1142 *value = NULL; 1143 *value_len = 0; 1144 } 1145 1146 static int 1147 lvs_verify_lvol_name(struct spdk_lvol_store *lvs, const char *name) 1148 { 1149 struct spdk_lvol *tmp; 1150 1151 if (name == NULL || strnlen(name, SPDK_LVOL_NAME_MAX) == 0) { 1152 SPDK_INFOLOG(lvol, "lvol name not provided.\n"); 1153 return -EINVAL; 1154 } 1155 1156 if (strnlen(name, SPDK_LVOL_NAME_MAX) == SPDK_LVOL_NAME_MAX) { 1157 SPDK_ERRLOG("Name has no null terminator.\n"); 1158 return -EINVAL; 1159 } 1160 1161 TAILQ_FOREACH(tmp, &lvs->lvols, link) { 1162 if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) { 1163 SPDK_ERRLOG("lvol with name %s already exists\n", name); 1164 return -EEXIST; 1165 } 1166 } 1167 1168 TAILQ_FOREACH(tmp, &lvs->pending_lvols, link) { 1169 if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) { 1170 SPDK_ERRLOG("lvol with name %s is being already created\n", name); 1171 return -EEXIST; 1172 } 1173 } 1174 1175 return 0; 1176 } 1177 1178 int 1179 spdk_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz, 1180 bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn, 1181 void *cb_arg) 1182 { 1183 struct spdk_lvol_with_handle_req *req; 1184 struct spdk_blob_store *bs; 1185 struct spdk_lvol *lvol; 1186 struct spdk_blob_opts opts; 1187 char *xattr_names[] = {LVOL_NAME, "uuid"}; 1188 int rc; 1189 1190 if (lvs == NULL) { 1191 SPDK_ERRLOG("lvol store does not exist\n"); 1192 return -EINVAL; 1193 } 1194 1195 rc = lvs_verify_lvol_name(lvs, name); 1196 if (rc < 0) { 1197 return rc; 1198 } 1199 1200 bs = lvs->blobstore; 1201 1202 req = calloc(1, sizeof(*req)); 1203 if (!req) { 1204 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1205 return -ENOMEM; 1206 } 1207 req->cb_fn = cb_fn; 1208 req->cb_arg = cb_arg; 1209 1210 lvol = lvol_alloc(lvs, name, thin_provision, clear_method); 1211 if (!lvol) { 1212 free(req); 1213 SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n"); 1214 return -ENOMEM; 1215 } 1216 1217 req->lvol = lvol; 1218 spdk_blob_opts_init(&opts, sizeof(opts)); 1219 opts.thin_provision = thin_provision; 1220 opts.num_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(bs)); 1221 opts.clear_method = lvol->clear_method; 1222 opts.xattrs.count = SPDK_COUNTOF(xattr_names); 1223 opts.xattrs.names = xattr_names; 1224 opts.xattrs.ctx = lvol; 1225 opts.xattrs.get_value = lvol_get_xattr_value; 1226 1227 spdk_bs_create_blob_ext(lvs->blobstore, &opts, lvol_create_cb, req); 1228 1229 return 0; 1230 } 1231 1232 int 1233 spdk_lvol_create_esnap_clone(const void *esnap_id, uint32_t id_len, uint64_t size_bytes, 1234 struct spdk_lvol_store *lvs, const char *clone_name, 1235 spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg) 1236 { 1237 struct spdk_lvol_with_handle_req *req; 1238 struct spdk_blob_store *bs; 1239 struct spdk_lvol *lvol; 1240 struct spdk_blob_opts opts; 1241 char *xattr_names[] = {LVOL_NAME, "uuid"}; 1242 int rc; 1243 1244 if (lvs == NULL) { 1245 SPDK_ERRLOG("lvol store does not exist\n"); 1246 return -EINVAL; 1247 } 1248 1249 rc = lvs_verify_lvol_name(lvs, clone_name); 1250 if (rc < 0) { 1251 return rc; 1252 } 1253 1254 bs = lvs->blobstore; 1255 1256 req = calloc(1, sizeof(*req)); 1257 if (!req) { 1258 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1259 return -ENOMEM; 1260 } 1261 req->cb_fn = cb_fn; 1262 req->cb_arg = cb_arg; 1263 1264 lvol = lvol_alloc(lvs, clone_name, true, LVOL_CLEAR_WITH_DEFAULT); 1265 if (!lvol) { 1266 free(req); 1267 SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n"); 1268 return -ENOMEM; 1269 } 1270 req->lvol = lvol; 1271 1272 spdk_blob_opts_init(&opts, sizeof(opts)); 1273 opts.esnap_id = esnap_id; 1274 opts.esnap_id_len = id_len; 1275 opts.thin_provision = true; 1276 opts.num_clusters = spdk_divide_round_up(size_bytes, spdk_bs_get_cluster_size(bs)); 1277 opts.clear_method = lvol->clear_method; 1278 opts.xattrs.count = SPDK_COUNTOF(xattr_names); 1279 opts.xattrs.names = xattr_names; 1280 opts.xattrs.ctx = lvol; 1281 opts.xattrs.get_value = lvol_get_xattr_value; 1282 1283 spdk_bs_create_blob_ext(lvs->blobstore, &opts, lvol_create_cb, req); 1284 1285 return 0; 1286 } 1287 1288 void 1289 spdk_lvol_create_snapshot(struct spdk_lvol *origlvol, const char *snapshot_name, 1290 spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg) 1291 { 1292 struct spdk_lvol_store *lvs; 1293 struct spdk_lvol *newlvol; 1294 struct spdk_blob *origblob; 1295 struct spdk_lvol_with_handle_req *req; 1296 struct spdk_blob_xattr_opts snapshot_xattrs; 1297 char *xattr_names[] = {LVOL_NAME, "uuid"}; 1298 int rc; 1299 1300 if (origlvol == NULL) { 1301 SPDK_INFOLOG(lvol, "Lvol not provided.\n"); 1302 cb_fn(cb_arg, NULL, -EINVAL); 1303 return; 1304 } 1305 1306 origblob = origlvol->blob; 1307 lvs = origlvol->lvol_store; 1308 if (lvs == NULL) { 1309 SPDK_ERRLOG("lvol store does not exist\n"); 1310 cb_fn(cb_arg, NULL, -EINVAL); 1311 return; 1312 } 1313 1314 rc = lvs_verify_lvol_name(lvs, snapshot_name); 1315 if (rc < 0) { 1316 cb_fn(cb_arg, NULL, rc); 1317 return; 1318 } 1319 1320 req = calloc(1, sizeof(*req)); 1321 if (!req) { 1322 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1323 cb_fn(cb_arg, NULL, -ENOMEM); 1324 return; 1325 } 1326 1327 newlvol = lvol_alloc(origlvol->lvol_store, snapshot_name, true, 1328 (enum lvol_clear_method)origlvol->clear_method); 1329 if (!newlvol) { 1330 SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n"); 1331 free(req); 1332 cb_fn(cb_arg, NULL, -ENOMEM); 1333 return; 1334 } 1335 1336 snapshot_xattrs.count = SPDK_COUNTOF(xattr_names); 1337 snapshot_xattrs.ctx = newlvol; 1338 snapshot_xattrs.names = xattr_names; 1339 snapshot_xattrs.get_value = lvol_get_xattr_value; 1340 req->lvol = newlvol; 1341 req->origlvol = origlvol; 1342 req->cb_fn = cb_fn; 1343 req->cb_arg = cb_arg; 1344 1345 spdk_bs_create_snapshot(lvs->blobstore, spdk_blob_get_id(origblob), &snapshot_xattrs, 1346 lvol_create_cb, req); 1347 } 1348 1349 void 1350 spdk_lvol_create_clone(struct spdk_lvol *origlvol, const char *clone_name, 1351 spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg) 1352 { 1353 struct spdk_lvol *newlvol; 1354 struct spdk_lvol_with_handle_req *req; 1355 struct spdk_lvol_store *lvs; 1356 struct spdk_blob *origblob; 1357 struct spdk_blob_xattr_opts clone_xattrs; 1358 char *xattr_names[] = {LVOL_NAME, "uuid"}; 1359 int rc; 1360 1361 if (origlvol == NULL) { 1362 SPDK_INFOLOG(lvol, "Lvol not provided.\n"); 1363 cb_fn(cb_arg, NULL, -EINVAL); 1364 return; 1365 } 1366 1367 origblob = origlvol->blob; 1368 lvs = origlvol->lvol_store; 1369 if (lvs == NULL) { 1370 SPDK_ERRLOG("lvol store does not exist\n"); 1371 cb_fn(cb_arg, NULL, -EINVAL); 1372 return; 1373 } 1374 1375 rc = lvs_verify_lvol_name(lvs, clone_name); 1376 if (rc < 0) { 1377 cb_fn(cb_arg, NULL, rc); 1378 return; 1379 } 1380 1381 req = calloc(1, sizeof(*req)); 1382 if (!req) { 1383 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1384 cb_fn(cb_arg, NULL, -ENOMEM); 1385 return; 1386 } 1387 1388 newlvol = lvol_alloc(lvs, clone_name, true, (enum lvol_clear_method)origlvol->clear_method); 1389 if (!newlvol) { 1390 SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n"); 1391 free(req); 1392 cb_fn(cb_arg, NULL, -ENOMEM); 1393 return; 1394 } 1395 1396 clone_xattrs.count = SPDK_COUNTOF(xattr_names); 1397 clone_xattrs.ctx = newlvol; 1398 clone_xattrs.names = xattr_names; 1399 clone_xattrs.get_value = lvol_get_xattr_value; 1400 req->lvol = newlvol; 1401 req->cb_fn = cb_fn; 1402 req->cb_arg = cb_arg; 1403 1404 spdk_bs_create_clone(lvs->blobstore, spdk_blob_get_id(origblob), &clone_xattrs, 1405 lvol_create_cb, 1406 req); 1407 } 1408 1409 static void 1410 lvol_resize_done(void *cb_arg, int lvolerrno) 1411 { 1412 struct spdk_lvol_req *req = cb_arg; 1413 1414 req->cb_fn(req->cb_arg, lvolerrno); 1415 free(req); 1416 } 1417 1418 static void 1419 lvol_blob_resize_cb(void *cb_arg, int bserrno) 1420 { 1421 struct spdk_lvol_req *req = cb_arg; 1422 struct spdk_lvol *lvol = req->lvol; 1423 1424 if (bserrno != 0) { 1425 req->cb_fn(req->cb_arg, bserrno); 1426 free(req); 1427 return; 1428 } 1429 1430 spdk_blob_sync_md(lvol->blob, lvol_resize_done, req); 1431 } 1432 1433 void 1434 spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, 1435 spdk_lvol_op_complete cb_fn, void *cb_arg) 1436 { 1437 struct spdk_blob *blob = lvol->blob; 1438 struct spdk_lvol_store *lvs = lvol->lvol_store; 1439 struct spdk_lvol_req *req; 1440 uint64_t new_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(lvs->blobstore)); 1441 1442 req = calloc(1, sizeof(*req)); 1443 if (!req) { 1444 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1445 cb_fn(cb_arg, -ENOMEM); 1446 return; 1447 } 1448 req->cb_fn = cb_fn; 1449 req->cb_arg = cb_arg; 1450 req->lvol = lvol; 1451 1452 spdk_blob_resize(blob, new_clusters, lvol_blob_resize_cb, req); 1453 } 1454 1455 static void 1456 lvol_set_read_only_cb(void *cb_arg, int lvolerrno) 1457 { 1458 struct spdk_lvol_req *req = cb_arg; 1459 1460 req->cb_fn(req->cb_arg, lvolerrno); 1461 free(req); 1462 } 1463 1464 void 1465 spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg) 1466 { 1467 struct spdk_lvol_req *req; 1468 1469 req = calloc(1, sizeof(*req)); 1470 if (!req) { 1471 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1472 cb_fn(cb_arg, -ENOMEM); 1473 return; 1474 } 1475 req->cb_fn = cb_fn; 1476 req->cb_arg = cb_arg; 1477 1478 spdk_blob_set_read_only(lvol->blob); 1479 spdk_blob_sync_md(lvol->blob, lvol_set_read_only_cb, req); 1480 } 1481 1482 static void 1483 lvol_rename_cb(void *cb_arg, int lvolerrno) 1484 { 1485 struct spdk_lvol_req *req = cb_arg; 1486 1487 if (lvolerrno != 0) { 1488 SPDK_ERRLOG("Lvol rename operation failed\n"); 1489 } else { 1490 snprintf(req->lvol->name, sizeof(req->lvol->name), "%s", req->name); 1491 } 1492 1493 req->cb_fn(req->cb_arg, lvolerrno); 1494 free(req); 1495 } 1496 1497 void 1498 spdk_lvol_rename(struct spdk_lvol *lvol, const char *new_name, 1499 spdk_lvol_op_complete cb_fn, void *cb_arg) 1500 { 1501 struct spdk_lvol *tmp; 1502 struct spdk_blob *blob = lvol->blob; 1503 struct spdk_lvol_req *req; 1504 int rc; 1505 1506 /* Check if new name is current lvol name. 1507 * If so, return success immediately */ 1508 if (strncmp(lvol->name, new_name, SPDK_LVOL_NAME_MAX) == 0) { 1509 cb_fn(cb_arg, 0); 1510 return; 1511 } 1512 1513 /* Check if lvol with 'new_name' already exists in lvolstore */ 1514 TAILQ_FOREACH(tmp, &lvol->lvol_store->lvols, link) { 1515 if (strncmp(tmp->name, new_name, SPDK_LVOL_NAME_MAX) == 0) { 1516 SPDK_ERRLOG("Lvol %s already exists in lvol store %s\n", new_name, lvol->lvol_store->name); 1517 cb_fn(cb_arg, -EEXIST); 1518 return; 1519 } 1520 } 1521 1522 req = calloc(1, sizeof(*req)); 1523 if (!req) { 1524 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1525 cb_fn(cb_arg, -ENOMEM); 1526 return; 1527 } 1528 req->cb_fn = cb_fn; 1529 req->cb_arg = cb_arg; 1530 req->lvol = lvol; 1531 snprintf(req->name, sizeof(req->name), "%s", new_name); 1532 1533 rc = spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1534 if (rc < 0) { 1535 free(req); 1536 cb_fn(cb_arg, rc); 1537 return; 1538 } 1539 1540 spdk_blob_sync_md(blob, lvol_rename_cb, req); 1541 } 1542 1543 void 1544 spdk_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg) 1545 { 1546 struct spdk_lvol_req *req; 1547 struct spdk_blob_store *bs; 1548 struct spdk_lvol_store *lvs = lvol->lvol_store; 1549 spdk_blob_id clone_id; 1550 size_t count = 1; 1551 int rc; 1552 1553 assert(cb_fn != NULL); 1554 1555 if (lvol == NULL) { 1556 SPDK_ERRLOG("lvol does not exist\n"); 1557 cb_fn(cb_arg, -ENODEV); 1558 return; 1559 } 1560 1561 if (lvol->ref_count != 0) { 1562 SPDK_ERRLOG("Cannot destroy lvol %s because it is still open\n", lvol->unique_id); 1563 cb_fn(cb_arg, -EBUSY); 1564 return; 1565 } 1566 1567 lvol->action_in_progress = true; 1568 1569 req = calloc(1, sizeof(*req)); 1570 if (!req) { 1571 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1572 cb_fn(cb_arg, -ENOMEM); 1573 return; 1574 } 1575 1576 req->cb_fn = cb_fn; 1577 req->cb_arg = cb_arg; 1578 req->lvol = lvol; 1579 bs = lvol->lvol_store->blobstore; 1580 1581 rc = spdk_blob_get_clones(lvs->blobstore, lvol->blob_id, &clone_id, &count); 1582 if (rc == 0 && count == 1) { 1583 req->clone_lvol = lvs_get_lvol_by_blob_id(lvs, clone_id); 1584 } else if (rc == -ENOMEM) { 1585 SPDK_INFOLOG(lvol, "lvol %s: cannot destroy: has %" PRIu64 " clones\n", 1586 lvol->unique_id, count); 1587 free(req); 1588 assert(count > 1); 1589 cb_fn(cb_arg, -EBUSY); 1590 return; 1591 } 1592 1593 spdk_bs_delete_blob(bs, lvol->blob_id, lvol_delete_blob_cb, req); 1594 } 1595 1596 void 1597 spdk_lvol_close(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg) 1598 { 1599 struct spdk_lvol_req *req; 1600 1601 assert(cb_fn != NULL); 1602 1603 if (lvol == NULL) { 1604 SPDK_ERRLOG("lvol does not exist\n"); 1605 cb_fn(cb_arg, -ENODEV); 1606 return; 1607 } 1608 1609 if (lvol->ref_count > 1) { 1610 lvol->ref_count--; 1611 cb_fn(cb_arg, 0); 1612 return; 1613 } else if (lvol->ref_count == 0) { 1614 cb_fn(cb_arg, -EINVAL); 1615 return; 1616 } 1617 1618 lvol->action_in_progress = true; 1619 1620 req = calloc(1, sizeof(*req)); 1621 if (!req) { 1622 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1623 cb_fn(cb_arg, -ENOMEM); 1624 return; 1625 } 1626 1627 req->cb_fn = cb_fn; 1628 req->cb_arg = cb_arg; 1629 req->lvol = lvol; 1630 1631 spdk_blob_close(lvol->blob, lvol_close_blob_cb, req); 1632 } 1633 1634 struct spdk_io_channel * 1635 spdk_lvol_get_io_channel(struct spdk_lvol *lvol) 1636 { 1637 return spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore); 1638 } 1639 1640 static void 1641 lvol_inflate_cb(void *cb_arg, int lvolerrno) 1642 { 1643 struct spdk_lvol_req *req = cb_arg; 1644 1645 spdk_bs_free_io_channel(req->channel); 1646 1647 if (lvolerrno < 0) { 1648 SPDK_ERRLOG("Could not inflate lvol\n"); 1649 } 1650 1651 req->cb_fn(req->cb_arg, lvolerrno); 1652 free(req); 1653 } 1654 1655 void 1656 spdk_lvol_inflate(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg) 1657 { 1658 struct spdk_lvol_req *req; 1659 spdk_blob_id blob_id; 1660 1661 assert(cb_fn != NULL); 1662 1663 if (lvol == NULL) { 1664 SPDK_ERRLOG("Lvol does not exist\n"); 1665 cb_fn(cb_arg, -ENODEV); 1666 return; 1667 } 1668 1669 req = calloc(1, sizeof(*req)); 1670 if (!req) { 1671 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1672 cb_fn(cb_arg, -ENOMEM); 1673 return; 1674 } 1675 1676 req->cb_fn = cb_fn; 1677 req->cb_arg = cb_arg; 1678 req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore); 1679 if (req->channel == NULL) { 1680 SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n"); 1681 free(req); 1682 cb_fn(cb_arg, -ENOMEM); 1683 return; 1684 } 1685 1686 blob_id = spdk_blob_get_id(lvol->blob); 1687 spdk_bs_inflate_blob(lvol->lvol_store->blobstore, req->channel, blob_id, lvol_inflate_cb, 1688 req); 1689 } 1690 1691 void 1692 spdk_lvol_decouple_parent(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg) 1693 { 1694 struct spdk_lvol_req *req; 1695 spdk_blob_id blob_id; 1696 1697 assert(cb_fn != NULL); 1698 1699 if (lvol == NULL) { 1700 SPDK_ERRLOG("Lvol does not exist\n"); 1701 cb_fn(cb_arg, -ENODEV); 1702 return; 1703 } 1704 1705 req = calloc(1, sizeof(*req)); 1706 if (!req) { 1707 SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n"); 1708 cb_fn(cb_arg, -ENOMEM); 1709 return; 1710 } 1711 1712 req->cb_fn = cb_fn; 1713 req->cb_arg = cb_arg; 1714 req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore); 1715 if (req->channel == NULL) { 1716 SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n"); 1717 free(req); 1718 cb_fn(cb_arg, -ENOMEM); 1719 return; 1720 } 1721 1722 blob_id = spdk_blob_get_id(lvol->blob); 1723 spdk_bs_blob_decouple_parent(lvol->lvol_store->blobstore, req->channel, blob_id, 1724 lvol_inflate_cb, req); 1725 } 1726 1727 void 1728 spdk_lvs_grow(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg) 1729 { 1730 struct spdk_lvs_with_handle_req *req; 1731 struct spdk_bs_opts opts = {}; 1732 1733 assert(cb_fn != NULL); 1734 1735 if (bs_dev == NULL) { 1736 SPDK_ERRLOG("Blobstore device does not exist\n"); 1737 cb_fn(cb_arg, NULL, -ENODEV); 1738 return; 1739 } 1740 1741 req = calloc(1, sizeof(*req)); 1742 if (req == NULL) { 1743 SPDK_ERRLOG("Cannot alloc memory for request structure\n"); 1744 cb_fn(cb_arg, NULL, -ENOMEM); 1745 return; 1746 } 1747 1748 req->lvol_store = lvs_alloc(); 1749 if (req->lvol_store == NULL) { 1750 SPDK_ERRLOG("Cannot alloc memory for lvol store\n"); 1751 free(req); 1752 cb_fn(cb_arg, NULL, -ENOMEM); 1753 return; 1754 } 1755 req->cb_fn = cb_fn; 1756 req->cb_arg = cb_arg; 1757 req->bs_dev = bs_dev; 1758 1759 lvs_bs_opts_init(&opts); 1760 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE"); 1761 1762 spdk_bs_grow(bs_dev, &opts, lvs_load_cb, req); 1763 } 1764 1765 static struct spdk_lvol * 1766 lvs_get_lvol_by_blob_id(struct spdk_lvol_store *lvs, spdk_blob_id blob_id) 1767 { 1768 struct spdk_lvol *lvol; 1769 1770 TAILQ_FOREACH(lvol, &lvs->lvols, link) { 1771 if (lvol->blob_id == blob_id) { 1772 return lvol; 1773 } 1774 } 1775 return NULL; 1776 } 1777 1778 static int 1779 lvs_esnap_bs_dev_create(void *bs_ctx, void *blob_ctx, struct spdk_blob *blob, 1780 const void *esnap_id, uint32_t id_len, 1781 struct spdk_bs_dev **bs_dev) 1782 { 1783 struct spdk_lvol_store *lvs = bs_ctx; 1784 struct spdk_lvol *lvol = blob_ctx; 1785 spdk_blob_id blob_id = spdk_blob_get_id(blob); 1786 1787 if (lvs == NULL) { 1788 if (lvol == NULL) { 1789 SPDK_ERRLOG("Blob 0x%" PRIx64 ": no lvs context nor lvol context\n", 1790 blob_id); 1791 return -EINVAL; 1792 } 1793 lvs = lvol->lvol_store; 1794 } 1795 1796 /* 1797 * When spdk_lvs_load() is called, it iterates through all blobs in its blobstore building 1798 * up a list of lvols (lvs->lvols). During this initial iteration, each blob is opened, 1799 * passed to load_next_lvol(), then closed. There is no need to open the external snapshot 1800 * during this phase. Once the blobstore is loaded, lvs->load_esnaps is set to true so that 1801 * future lvol opens cause the external snapshot to be loaded. 1802 */ 1803 if (!lvs->load_esnaps) { 1804 *bs_dev = NULL; 1805 return 0; 1806 } 1807 1808 if (lvol == NULL) { 1809 spdk_blob_id blob_id = spdk_blob_get_id(blob); 1810 1811 /* 1812 * If spdk_bs_blob_open() is used instead of spdk_bs_blob_open_ext() the lvol will 1813 * not have been passed in. The same is true if the open happens spontaneously due 1814 * to blobstore activity. 1815 */ 1816 lvol = lvs_get_lvol_by_blob_id(lvs, blob_id); 1817 if (lvol == NULL) { 1818 SPDK_ERRLOG("lvstore %s: no lvol for blob 0x%" PRIx64 "\n", 1819 lvs->name, blob_id); 1820 return -ENODEV; 1821 } 1822 } 1823 1824 return lvs->esnap_bs_dev_create(lvs, lvol, blob, esnap_id, id_len, bs_dev); 1825 } 1826 1827 /* 1828 * The theory of missing external snapshots 1829 * 1830 * The lvs->esnap_bs_dev_create() callback may be unable to create an external snapshot bs_dev when 1831 * it is called. This can happen, for instance, as when the device containing the lvolstore is 1832 * examined prior to spdk_bdev_register() being called on a bdev that acts as an external snapshot. 1833 * In such a case, the esnap_bs_dev_create() callback will call spdk_lvs_esnap_missing_add(). 1834 * 1835 * Missing external snapshots are tracked in a per-lvolstore tree, lvs->degraded_lvol_sets_tree. 1836 * Each tree node (struct spdk_lvs_degraded_lvol_set) contains a tailq of lvols that are missing 1837 * that particular external snapshot. 1838 */ 1839 static int 1840 lvs_esnap_name_cmp(struct spdk_lvs_degraded_lvol_set *m1, struct spdk_lvs_degraded_lvol_set *m2) 1841 { 1842 if (m1->id_len == m2->id_len) { 1843 return memcmp(m1->esnap_id, m2->esnap_id, m1->id_len); 1844 } 1845 return (m1->id_len > m2->id_len) ? 1 : -1; 1846 } 1847 1848 RB_GENERATE_STATIC(degraded_lvol_sets_tree, spdk_lvs_degraded_lvol_set, node, lvs_esnap_name_cmp) 1849 1850 static void 1851 lvs_degraded_lvol_set_add(struct spdk_lvs_degraded_lvol_set *degraded_set, struct spdk_lvol *lvol) 1852 { 1853 assert(lvol->lvol_store->thread == spdk_get_thread()); 1854 1855 lvol->degraded_set = degraded_set; 1856 TAILQ_INSERT_TAIL(°raded_set->lvols, lvol, degraded_link); 1857 } 1858 1859 static void 1860 lvs_degraded_lvol_set_remove(struct spdk_lvs_degraded_lvol_set *degraded_set, 1861 struct spdk_lvol *lvol) 1862 { 1863 assert(lvol->lvol_store->thread == spdk_get_thread()); 1864 1865 lvol->degraded_set = NULL; 1866 TAILQ_REMOVE(°raded_set->lvols, lvol, degraded_link); 1867 /* degraded_set->lvols may be empty. Caller should check if not immediately adding a new 1868 * lvol. */ 1869 } 1870 1871 /* 1872 * Record in lvs->degraded_lvol_sets_tree that a bdev of the specified name is needed by the 1873 * specified lvol. 1874 */ 1875 int 1876 spdk_lvs_esnap_missing_add(struct spdk_lvol_store *lvs, struct spdk_lvol *lvol, 1877 const void *esnap_id, uint32_t id_len) 1878 { 1879 struct spdk_lvs_degraded_lvol_set find, *degraded_set; 1880 1881 assert(lvs->thread == spdk_get_thread()); 1882 1883 find.esnap_id = esnap_id; 1884 find.id_len = id_len; 1885 degraded_set = RB_FIND(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, &find); 1886 if (degraded_set == NULL) { 1887 degraded_set = calloc(1, sizeof(*degraded_set)); 1888 if (degraded_set == NULL) { 1889 SPDK_ERRLOG("lvol %s: cannot create degraded_set node: out of memory\n", 1890 lvol->unique_id); 1891 return -ENOMEM; 1892 } 1893 degraded_set->esnap_id = calloc(1, id_len); 1894 if (degraded_set->esnap_id == NULL) { 1895 free(degraded_set); 1896 SPDK_ERRLOG("lvol %s: cannot create degraded_set node: out of memory\n", 1897 lvol->unique_id); 1898 return -ENOMEM; 1899 } 1900 memcpy((void *)degraded_set->esnap_id, esnap_id, id_len); 1901 degraded_set->id_len = id_len; 1902 degraded_set->lvol_store = lvs; 1903 TAILQ_INIT(°raded_set->lvols); 1904 RB_INSERT(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, degraded_set); 1905 } 1906 1907 lvs_degraded_lvol_set_add(degraded_set, lvol); 1908 1909 return 0; 1910 } 1911 1912 /* 1913 * Remove the record of the specified lvol needing a degraded_set bdev. 1914 */ 1915 void 1916 spdk_lvs_esnap_missing_remove(struct spdk_lvol *lvol) 1917 { 1918 struct spdk_lvol_store *lvs = lvol->lvol_store; 1919 struct spdk_lvs_degraded_lvol_set *degraded_set = lvol->degraded_set; 1920 1921 assert(lvs->thread == spdk_get_thread()); 1922 1923 if (degraded_set == NULL) { 1924 return; 1925 } 1926 1927 lvs_degraded_lvol_set_remove(degraded_set, lvol); 1928 1929 if (!TAILQ_EMPTY(°raded_set->lvols)) { 1930 return; 1931 } 1932 1933 RB_REMOVE(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, degraded_set); 1934 1935 free((char *)degraded_set->esnap_id); 1936 free(degraded_set); 1937 } 1938