1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2023 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/env.h" 7 #include "spdk/util.h" 8 #include "spdk/likely.h" 9 #include "spdk/log.h" 10 #include "spdk/thread.h" 11 12 #define IOBUF_MIN_SMALL_POOL_SIZE 64 13 #define IOBUF_MIN_LARGE_POOL_SIZE 8 14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE 8192 15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE 1024 16 #define IOBUF_ALIGNMENT 4096 17 #define IOBUF_MIN_SMALL_BUFSIZE 4096 18 #define IOBUF_MIN_LARGE_BUFSIZE 8192 19 #define IOBUF_DEFAULT_SMALL_BUFSIZE (8 * 1024) 20 /* 132k is a weird choice at first, but this needs to be large enough to accommodate 21 * the default maximum size (128k) plus metadata everywhere. For code paths that 22 * are explicitly configured, the math is instead done properly. This is only 23 * for the default. */ 24 #define IOBUF_DEFAULT_LARGE_BUFSIZE (132 * 1024) 25 #define IOBUF_MAX_CHANNELS 64 26 27 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE, 28 "Invalid data offset"); 29 30 static bool g_iobuf_is_initialized = false; 31 32 struct iobuf_channel_node { 33 spdk_iobuf_entry_stailq_t small_queue; 34 spdk_iobuf_entry_stailq_t large_queue; 35 }; 36 37 struct iobuf_channel { 38 struct iobuf_channel_node node[SPDK_CONFIG_MAX_NUMA_NODES]; 39 struct spdk_iobuf_channel *channels[IOBUF_MAX_CHANNELS]; 40 }; 41 42 struct iobuf_module { 43 char *name; 44 TAILQ_ENTRY(iobuf_module) tailq; 45 }; 46 47 struct iobuf_node { 48 struct spdk_ring *small_pool; 49 struct spdk_ring *large_pool; 50 void *small_pool_base; 51 void *large_pool_base; 52 }; 53 54 struct iobuf { 55 struct spdk_iobuf_opts opts; 56 TAILQ_HEAD(, iobuf_module) modules; 57 spdk_iobuf_finish_cb finish_cb; 58 void *finish_arg; 59 struct iobuf_node node[SPDK_CONFIG_MAX_NUMA_NODES]; 60 }; 61 62 #define IOBUF_FOREACH_NUMA_ID(i) \ 63 for (i = g_iobuf.opts.enable_numa ? spdk_env_get_first_numa_id() : 0; \ 64 i < INT32_MAX; \ 65 i = g_iobuf.opts.enable_numa ? spdk_env_get_next_numa_id(i) : INT32_MAX) 66 67 static struct iobuf g_iobuf = { 68 .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), 69 .node = {}, 70 .opts = { 71 .small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE, 72 .large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE, 73 .small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE, 74 .large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE, 75 }, 76 }; 77 78 struct iobuf_get_stats_ctx { 79 struct spdk_iobuf_module_stats *modules; 80 uint32_t num_modules; 81 spdk_iobuf_get_stats_cb cb_fn; 82 void *cb_arg; 83 }; 84 85 static int 86 iobuf_channel_create_cb(void *io_device, void *ctx) 87 { 88 struct iobuf_channel *ch = ctx; 89 struct iobuf_channel_node *node; 90 int32_t i; 91 92 IOBUF_FOREACH_NUMA_ID(i) { 93 node = &ch->node[i]; 94 STAILQ_INIT(&node->small_queue); 95 STAILQ_INIT(&node->large_queue); 96 } 97 98 return 0; 99 } 100 101 static void 102 iobuf_channel_destroy_cb(void *io_device, void *ctx) 103 { 104 struct iobuf_channel *ch = ctx; 105 struct iobuf_channel_node *node __attribute__((unused)); 106 int32_t i; 107 108 IOBUF_FOREACH_NUMA_ID(i) { 109 node = &ch->node[i]; 110 assert(STAILQ_EMPTY(&node->small_queue)); 111 assert(STAILQ_EMPTY(&node->large_queue)); 112 } 113 } 114 115 static int 116 iobuf_node_initialize(struct iobuf_node *node, uint32_t numa_id) 117 { 118 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 119 struct spdk_iobuf_buffer *buf; 120 uint64_t i; 121 int rc; 122 123 if (!g_iobuf.opts.enable_numa) { 124 numa_id = SPDK_ENV_NUMA_ID_ANY; 125 } 126 127 node->small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count, 128 numa_id); 129 if (!node->small_pool) { 130 SPDK_ERRLOG("Failed to create small iobuf pool\n"); 131 rc = -ENOMEM; 132 goto error; 133 } 134 135 node->small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT, 136 NULL, numa_id, SPDK_MALLOC_DMA); 137 if (node->small_pool_base == NULL) { 138 SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n"); 139 rc = -ENOMEM; 140 goto error; 141 } 142 143 node->large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count, 144 numa_id); 145 if (!node->large_pool) { 146 SPDK_ERRLOG("Failed to create large iobuf pool\n"); 147 rc = -ENOMEM; 148 goto error; 149 } 150 151 node->large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT, 152 NULL, numa_id, SPDK_MALLOC_DMA); 153 if (node->large_pool_base == NULL) { 154 SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n"); 155 rc = -ENOMEM; 156 goto error; 157 } 158 159 for (i = 0; i < opts->small_pool_count; i++) { 160 buf = node->small_pool_base + i * opts->small_bufsize; 161 spdk_ring_enqueue(node->small_pool, (void **)&buf, 1, NULL); 162 } 163 164 for (i = 0; i < opts->large_pool_count; i++) { 165 buf = node->large_pool_base + i * opts->large_bufsize; 166 spdk_ring_enqueue(node->large_pool, (void **)&buf, 1, NULL); 167 } 168 169 return 0; 170 171 error: 172 spdk_free(node->small_pool_base); 173 spdk_ring_free(node->small_pool); 174 spdk_free(node->large_pool_base); 175 spdk_ring_free(node->large_pool); 176 memset(node, 0, sizeof(*node)); 177 178 return rc; 179 } 180 181 static void 182 iobuf_node_free(struct iobuf_node *node) 183 { 184 if (node->small_pool == NULL) { 185 /* This node didn't get allocated, so just return immediately. */ 186 return; 187 } 188 189 if (spdk_ring_count(node->small_pool) != g_iobuf.opts.small_pool_count) { 190 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", 191 spdk_ring_count(node->small_pool), g_iobuf.opts.small_pool_count); 192 } 193 194 if (spdk_ring_count(node->large_pool) != g_iobuf.opts.large_pool_count) { 195 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", 196 spdk_ring_count(node->large_pool), g_iobuf.opts.large_pool_count); 197 } 198 199 spdk_free(node->small_pool_base); 200 node->small_pool_base = NULL; 201 spdk_ring_free(node->small_pool); 202 node->small_pool = NULL; 203 204 spdk_free(node->large_pool_base); 205 node->large_pool_base = NULL; 206 spdk_ring_free(node->large_pool); 207 node->large_pool = NULL; 208 } 209 210 int 211 spdk_iobuf_initialize(void) 212 { 213 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 214 struct iobuf_node *node; 215 int32_t i; 216 int rc = 0; 217 218 /* Round up to the nearest alignment so that each element remains aligned */ 219 opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT); 220 opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT); 221 222 IOBUF_FOREACH_NUMA_ID(i) { 223 node = &g_iobuf.node[i]; 224 rc = iobuf_node_initialize(node, i); 225 if (rc) { 226 goto err; 227 } 228 } 229 230 spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, 231 sizeof(struct iobuf_channel), "iobuf"); 232 g_iobuf_is_initialized = true; 233 234 return 0; 235 236 err: 237 IOBUF_FOREACH_NUMA_ID(i) { 238 node = &g_iobuf.node[i]; 239 iobuf_node_free(node); 240 } 241 return rc; 242 } 243 244 static void 245 iobuf_unregister_cb(void *io_device) 246 { 247 struct iobuf_module *module; 248 struct iobuf_node *node; 249 int32_t i; 250 251 while (!TAILQ_EMPTY(&g_iobuf.modules)) { 252 module = TAILQ_FIRST(&g_iobuf.modules); 253 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 254 free(module->name); 255 free(module); 256 } 257 258 IOBUF_FOREACH_NUMA_ID(i) { 259 node = &g_iobuf.node[i]; 260 iobuf_node_free(node); 261 } 262 263 if (g_iobuf.finish_cb != NULL) { 264 g_iobuf.finish_cb(g_iobuf.finish_arg); 265 } 266 } 267 268 void 269 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) 270 { 271 if (!g_iobuf_is_initialized) { 272 cb_fn(cb_arg); 273 return; 274 } 275 276 g_iobuf_is_initialized = false; 277 g_iobuf.finish_cb = cb_fn; 278 g_iobuf.finish_arg = cb_arg; 279 280 spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); 281 } 282 283 int 284 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) 285 { 286 if (!opts) { 287 SPDK_ERRLOG("opts cannot be NULL\n"); 288 return -1; 289 } 290 291 if (!opts->opts_size) { 292 SPDK_ERRLOG("opts_size inside opts cannot be zero value\n"); 293 return -1; 294 } 295 296 if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { 297 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", 298 IOBUF_MIN_SMALL_POOL_SIZE); 299 return -EINVAL; 300 } 301 if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { 302 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", 303 IOBUF_MIN_LARGE_POOL_SIZE); 304 return -EINVAL; 305 } 306 307 if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { 308 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n", 309 IOBUF_MIN_SMALL_BUFSIZE); 310 return -EINVAL; 311 } 312 313 if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { 314 SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n", 315 IOBUF_MIN_LARGE_BUFSIZE); 316 return -EINVAL; 317 } 318 319 if (opts->enable_numa && 320 spdk_env_get_last_numa_id() >= SPDK_CONFIG_MAX_NUMA_NODES) { 321 SPDK_ERRLOG("max NUMA ID %" PRIu32 " cannot be supported with " 322 "SPDK_CONFIG_MAX_NUMA_NODES %" PRIu32 "\n", 323 spdk_env_get_last_numa_id(), SPDK_CONFIG_MAX_NUMA_NODES); 324 SPDK_ERRLOG("Re-configure with --max-numa-nodes=%" PRIu32 "\n", 325 spdk_env_get_last_numa_id() + 1); 326 return -EINVAL; 327 } 328 329 #define SET_FIELD(field) \ 330 if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts->opts_size) { \ 331 g_iobuf.opts.field = opts->field; \ 332 } \ 333 334 SET_FIELD(small_pool_count); 335 SET_FIELD(large_pool_count); 336 SET_FIELD(small_bufsize); 337 SET_FIELD(large_bufsize); 338 SET_FIELD(enable_numa); 339 340 g_iobuf.opts.opts_size = opts->opts_size; 341 342 #undef SET_FIELD 343 344 return 0; 345 } 346 347 void 348 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts, size_t opts_size) 349 { 350 if (!opts) { 351 SPDK_ERRLOG("opts should not be NULL\n"); 352 return; 353 } 354 355 if (!opts_size) { 356 SPDK_ERRLOG("opts_size should not be zero value\n"); 357 return; 358 } 359 360 opts->opts_size = opts_size; 361 362 #define SET_FIELD(field) \ 363 if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts_size) { \ 364 opts->field = g_iobuf.opts.field; \ 365 } \ 366 367 SET_FIELD(small_pool_count); 368 SET_FIELD(large_pool_count); 369 SET_FIELD(small_bufsize); 370 SET_FIELD(large_bufsize); 371 SET_FIELD(enable_numa); 372 373 #undef SET_FIELD 374 375 /* Do not remove this statement, you should always update this statement when you adding a new field, 376 * and do not forget to add the SET_FIELD statement for your added field. */ 377 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_opts) == 40, "Incorrect size"); 378 } 379 380 static void 381 iobuf_channel_node_init(struct spdk_iobuf_channel *ch, struct iobuf_channel *iobuf_ch, 382 int32_t numa_id, uint32_t small_cache_size, uint32_t large_cache_size) 383 { 384 struct iobuf_node *node = &g_iobuf.node[numa_id]; 385 struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id]; 386 struct iobuf_channel_node *ch_node = &iobuf_ch->node[numa_id]; 387 388 cache->small.queue = &ch_node->small_queue; 389 cache->large.queue = &ch_node->large_queue; 390 cache->small.pool = node->small_pool; 391 cache->large.pool = node->large_pool; 392 cache->small.bufsize = g_iobuf.opts.small_bufsize; 393 cache->large.bufsize = g_iobuf.opts.large_bufsize; 394 cache->small.cache_size = small_cache_size; 395 cache->large.cache_size = large_cache_size; 396 cache->small.cache_count = 0; 397 cache->large.cache_count = 0; 398 399 STAILQ_INIT(&cache->small.cache); 400 STAILQ_INIT(&cache->large.cache); 401 } 402 403 static int 404 iobuf_channel_node_populate(struct spdk_iobuf_channel *ch, const char *name, int32_t numa_id) 405 { 406 struct iobuf_node *node = &g_iobuf.node[numa_id]; 407 struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id]; 408 uint32_t small_cache_size = cache->small.cache_size; 409 uint32_t large_cache_size = cache->large.cache_size; 410 struct spdk_iobuf_buffer *buf; 411 uint32_t i; 412 413 for (i = 0; i < small_cache_size; ++i) { 414 if (spdk_ring_dequeue(node->small_pool, (void **)&buf, 1) == 0) { 415 SPDK_ERRLOG("Failed to populate '%s' iobuf small buffer cache at %d/%d entries. " 416 "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n", 417 name, i, small_cache_size, g_iobuf.opts.small_pool_count); 418 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 419 "this value.\n"); 420 return -ENOMEM; 421 } 422 STAILQ_INSERT_TAIL(&cache->small.cache, buf, stailq); 423 cache->small.cache_count++; 424 } 425 for (i = 0; i < large_cache_size; ++i) { 426 if (spdk_ring_dequeue(node->large_pool, (void **)&buf, 1) == 0) { 427 SPDK_ERRLOG("Failed to populate '%s' iobuf large buffer cache at %d/%d entries. " 428 "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n", 429 name, i, large_cache_size, g_iobuf.opts.large_pool_count); 430 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 431 "this value.\n"); 432 return -ENOMEM; 433 } 434 STAILQ_INSERT_TAIL(&cache->large.cache, buf, stailq); 435 cache->large.cache_count++; 436 } 437 438 return 0; 439 } 440 441 int 442 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, 443 uint32_t small_cache_size, uint32_t large_cache_size) 444 { 445 struct spdk_io_channel *ioch; 446 struct iobuf_channel *iobuf_ch; 447 struct iobuf_module *module; 448 uint32_t i; 449 int32_t numa_id; 450 int rc; 451 452 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 453 if (strcmp(name, module->name) == 0) { 454 break; 455 } 456 } 457 458 if (module == NULL) { 459 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); 460 return -ENODEV; 461 } 462 463 ioch = spdk_get_io_channel(&g_iobuf); 464 if (ioch == NULL) { 465 SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); 466 return -ENOMEM; 467 } 468 469 iobuf_ch = spdk_io_channel_get_ctx(ioch); 470 471 for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) { 472 if (iobuf_ch->channels[i] == NULL) { 473 iobuf_ch->channels[i] = ch; 474 break; 475 } 476 } 477 478 if (i == IOBUF_MAX_CHANNELS) { 479 SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i); 480 rc = -ENOMEM; 481 goto error; 482 } 483 484 ch->parent = ioch; 485 ch->module = module; 486 487 IOBUF_FOREACH_NUMA_ID(numa_id) { 488 iobuf_channel_node_init(ch, iobuf_ch, numa_id, 489 small_cache_size, large_cache_size); 490 } 491 492 IOBUF_FOREACH_NUMA_ID(numa_id) { 493 rc = iobuf_channel_node_populate(ch, name, numa_id); 494 if (rc) { 495 goto error; 496 } 497 } 498 499 return 0; 500 error: 501 spdk_iobuf_channel_fini(ch); 502 503 return rc; 504 } 505 506 static void 507 iobuf_channel_node_fini(struct spdk_iobuf_channel *ch, int32_t numa_id) 508 { 509 struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id]; 510 struct iobuf_node *node = &g_iobuf.node[numa_id]; 511 struct spdk_iobuf_entry *entry __attribute__((unused)); 512 struct spdk_iobuf_buffer *buf; 513 514 /* Make sure none of the wait queue entries are coming from this module */ 515 STAILQ_FOREACH(entry, cache->small.queue, stailq) { 516 assert(entry->module != ch->module); 517 } 518 STAILQ_FOREACH(entry, cache->large.queue, stailq) { 519 assert(entry->module != ch->module); 520 } 521 522 /* Release cached buffers back to the pool */ 523 while (!STAILQ_EMPTY(&cache->small.cache)) { 524 buf = STAILQ_FIRST(&cache->small.cache); 525 STAILQ_REMOVE_HEAD(&cache->small.cache, stailq); 526 spdk_ring_enqueue(node->small_pool, (void **)&buf, 1, NULL); 527 cache->small.cache_count--; 528 } 529 while (!STAILQ_EMPTY(&cache->large.cache)) { 530 buf = STAILQ_FIRST(&cache->large.cache); 531 STAILQ_REMOVE_HEAD(&cache->large.cache, stailq); 532 spdk_ring_enqueue(node->large_pool, (void **)&buf, 1, NULL); 533 cache->large.cache_count--; 534 } 535 536 assert(cache->small.cache_count == 0); 537 assert(cache->large.cache_count == 0); 538 } 539 540 void 541 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) 542 { 543 struct iobuf_channel *iobuf_ch; 544 uint32_t i; 545 546 IOBUF_FOREACH_NUMA_ID(i) { 547 iobuf_channel_node_fini(ch, i); 548 } 549 550 iobuf_ch = spdk_io_channel_get_ctx(ch->parent); 551 for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) { 552 if (iobuf_ch->channels[i] == ch) { 553 iobuf_ch->channels[i] = NULL; 554 break; 555 } 556 } 557 558 spdk_put_io_channel(ch->parent); 559 ch->parent = NULL; 560 } 561 562 int 563 spdk_iobuf_register_module(const char *name) 564 { 565 struct iobuf_module *module; 566 567 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 568 if (strcmp(name, module->name) == 0) { 569 return -EEXIST; 570 } 571 } 572 573 module = calloc(1, sizeof(*module)); 574 if (module == NULL) { 575 return -ENOMEM; 576 } 577 578 module->name = strdup(name); 579 if (module->name == NULL) { 580 free(module); 581 return -ENOMEM; 582 } 583 584 TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); 585 586 return 0; 587 } 588 589 int 590 spdk_iobuf_unregister_module(const char *name) 591 { 592 struct iobuf_module *module; 593 594 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 595 if (strcmp(name, module->name) == 0) { 596 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 597 free(module->name); 598 free(module); 599 return 0; 600 } 601 } 602 603 return -ENOENT; 604 } 605 606 static int 607 iobuf_pool_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool_cache *pool, 608 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 609 { 610 struct spdk_iobuf_entry *entry, *tmp; 611 int rc; 612 613 STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { 614 /* We only want to iterate over the entries requested by the module which owns ch */ 615 if (entry->module != ch->module) { 616 continue; 617 } 618 619 rc = cb_fn(ch, entry, cb_ctx); 620 if (rc != 0) { 621 return rc; 622 } 623 } 624 625 return 0; 626 } 627 628 int 629 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, 630 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 631 { 632 struct spdk_iobuf_node_cache *cache; 633 uint32_t i; 634 int rc; 635 636 IOBUF_FOREACH_NUMA_ID(i) { 637 cache = &ch->cache[i]; 638 639 rc = iobuf_pool_for_each_entry(ch, &cache->small, cb_fn, cb_ctx); 640 if (rc != 0) { 641 return rc; 642 } 643 rc = iobuf_pool_for_each_entry(ch, &cache->large, cb_fn, cb_ctx); 644 if (rc != 0) { 645 return rc; 646 } 647 } 648 649 return 0; 650 } 651 652 static bool 653 iobuf_entry_abort_node(struct spdk_iobuf_channel *ch, int32_t numa_id, 654 struct spdk_iobuf_entry *entry, uint64_t len) 655 { 656 struct spdk_iobuf_node_cache *cache; 657 struct spdk_iobuf_pool_cache *pool; 658 struct spdk_iobuf_entry *e; 659 660 cache = &ch->cache[numa_id]; 661 662 if (len <= cache->small.bufsize) { 663 pool = &cache->small; 664 } else { 665 assert(len <= cache->large.bufsize); 666 pool = &cache->large; 667 } 668 669 STAILQ_FOREACH(e, pool->queue, stailq) { 670 if (e == entry) { 671 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 672 return true; 673 } 674 } 675 676 return false; 677 } 678 679 void 680 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, 681 uint64_t len) 682 { 683 uint32_t i; 684 685 IOBUF_FOREACH_NUMA_ID(i) { 686 iobuf_entry_abort_node(ch, i, entry, len); 687 } 688 } 689 690 #define IOBUF_BATCH_SIZE 32 691 692 void * 693 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, 694 struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn) 695 { 696 struct spdk_iobuf_node_cache *cache; 697 struct spdk_iobuf_pool_cache *pool; 698 void *buf; 699 700 cache = &ch->cache[0]; 701 702 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 703 if (len <= cache->small.bufsize) { 704 pool = &cache->small; 705 } else { 706 assert(len <= cache->large.bufsize); 707 pool = &cache->large; 708 } 709 710 buf = (void *)STAILQ_FIRST(&pool->cache); 711 if (buf) { 712 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 713 assert(pool->cache_count > 0); 714 pool->cache_count--; 715 pool->stats.cache++; 716 } else { 717 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 718 size_t sz, i; 719 720 /* If we're going to dequeue, we may as well dequeue a batch. */ 721 sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE, 722 spdk_max(pool->cache_size, 1))); 723 if (sz == 0) { 724 if (entry) { 725 STAILQ_INSERT_TAIL(pool->queue, entry, stailq); 726 entry->module = ch->module; 727 entry->cb_fn = cb_fn; 728 pool->stats.retry++; 729 } 730 731 return NULL; 732 } 733 734 pool->stats.main++; 735 for (i = 0; i < (sz - 1); i++) { 736 STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq); 737 pool->cache_count++; 738 } 739 740 /* The last one is the one we'll return */ 741 buf = bufs[i]; 742 } 743 744 return (char *)buf; 745 } 746 747 void 748 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) 749 { 750 struct spdk_iobuf_entry *entry; 751 struct spdk_iobuf_buffer *iobuf_buf; 752 struct spdk_iobuf_node_cache *cache; 753 struct spdk_iobuf_pool_cache *pool; 754 uint32_t numa_id; 755 size_t sz; 756 757 if (g_iobuf.opts.enable_numa) { 758 numa_id = spdk_mem_get_numa_id(buf, NULL); 759 } else { 760 numa_id = 0; 761 } 762 763 cache = &ch->cache[numa_id]; 764 765 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 766 if (len <= cache->small.bufsize) { 767 pool = &cache->small; 768 } else { 769 pool = &cache->large; 770 } 771 772 if (STAILQ_EMPTY(pool->queue)) { 773 if (pool->cache_size == 0) { 774 spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL); 775 return; 776 } 777 778 iobuf_buf = (struct spdk_iobuf_buffer *)buf; 779 780 STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq); 781 pool->cache_count++; 782 783 /* The cache size may exceed the configured amount. We always dequeue from the 784 * central pool in batches of known size, so wait until at least a batch 785 * has been returned to actually return the buffers to the central pool. */ 786 sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size); 787 if (pool->cache_count >= pool->cache_size + sz) { 788 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 789 size_t i; 790 791 for (i = 0; i < sz; i++) { 792 bufs[i] = STAILQ_FIRST(&pool->cache); 793 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 794 assert(pool->cache_count > 0); 795 pool->cache_count--; 796 } 797 798 spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL); 799 } 800 } else { 801 entry = STAILQ_FIRST(pool->queue); 802 STAILQ_REMOVE_HEAD(pool->queue, stailq); 803 entry->cb_fn(entry, buf); 804 if (spdk_unlikely(entry == STAILQ_LAST(pool->queue, spdk_iobuf_entry, stailq))) { 805 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 806 STAILQ_INSERT_HEAD(pool->queue, entry, stailq); 807 } 808 } 809 } 810 811 static void 812 iobuf_get_channel_stats_done(struct spdk_io_channel_iter *iter, int status) 813 { 814 struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter); 815 816 ctx->cb_fn(ctx->modules, ctx->num_modules, ctx->cb_arg); 817 free(ctx->modules); 818 free(ctx); 819 } 820 821 static void 822 iobuf_get_channel_stats(struct spdk_io_channel_iter *iter) 823 { 824 struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter); 825 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(iter); 826 struct iobuf_channel *iobuf_ch = spdk_io_channel_get_ctx(ch); 827 struct spdk_iobuf_channel *channel; 828 struct iobuf_module *module; 829 struct spdk_iobuf_module_stats *it; 830 uint32_t i, j; 831 832 for (i = 0; i < ctx->num_modules; ++i) { 833 for (j = 0; j < IOBUF_MAX_CHANNELS; ++j) { 834 channel = iobuf_ch->channels[j]; 835 if (channel == NULL) { 836 continue; 837 } 838 839 it = &ctx->modules[i]; 840 module = (struct iobuf_module *)channel->module; 841 if (strcmp(it->module, module->name) == 0) { 842 struct spdk_iobuf_pool_cache *cache; 843 uint32_t i; 844 845 IOBUF_FOREACH_NUMA_ID(i) { 846 cache = &channel->cache[i].small; 847 it->small_pool.cache += cache->stats.cache; 848 it->small_pool.main += cache->stats.main; 849 it->small_pool.retry += cache->stats.retry; 850 851 cache = &channel->cache[i].large; 852 it->large_pool.cache += cache->stats.cache; 853 it->large_pool.main += cache->stats.main; 854 it->large_pool.retry += cache->stats.retry; 855 } 856 break; 857 } 858 } 859 } 860 861 spdk_for_each_channel_continue(iter, 0); 862 } 863 864 int 865 spdk_iobuf_get_stats(spdk_iobuf_get_stats_cb cb_fn, void *cb_arg) 866 { 867 struct iobuf_module *module; 868 struct iobuf_get_stats_ctx *ctx; 869 uint32_t i; 870 871 ctx = calloc(1, sizeof(*ctx)); 872 if (ctx == NULL) { 873 return -ENOMEM; 874 } 875 876 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 877 ++ctx->num_modules; 878 } 879 880 ctx->modules = calloc(ctx->num_modules, sizeof(struct spdk_iobuf_module_stats)); 881 if (ctx->modules == NULL) { 882 free(ctx); 883 return -ENOMEM; 884 } 885 886 i = 0; 887 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 888 ctx->modules[i].module = module->name; 889 ++i; 890 } 891 892 ctx->cb_fn = cb_fn; 893 ctx->cb_arg = cb_arg; 894 895 spdk_for_each_channel(&g_iobuf, iobuf_get_channel_stats, ctx, 896 iobuf_get_channel_stats_done); 897 return 0; 898 } 899