1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2023 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/env.h" 7 #include "spdk/util.h" 8 #include "spdk/likely.h" 9 #include "spdk/log.h" 10 #include "spdk/thread.h" 11 12 #define IOBUF_MIN_SMALL_POOL_SIZE 64 13 #define IOBUF_MIN_LARGE_POOL_SIZE 8 14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE 8192 15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE 1024 16 #define IOBUF_ALIGNMENT 4096 17 #define IOBUF_MIN_SMALL_BUFSIZE 4096 18 #define IOBUF_MIN_LARGE_BUFSIZE 8192 19 #define IOBUF_DEFAULT_SMALL_BUFSIZE (8 * 1024) 20 /* 132k is a weird choice at first, but this needs to be large enough to accomodate 21 * the default maximum size (128k) plus metadata everywhere. For code paths that 22 * are explicitly configured, the math is instead done properly. This is only 23 * for the default. */ 24 #define IOBUF_DEFAULT_LARGE_BUFSIZE (132 * 1024) 25 #define IOBUF_MAX_CHANNELS 64 26 27 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE, 28 "Invalid data offset"); 29 30 static bool g_iobuf_is_initialized = false; 31 32 struct iobuf_channel { 33 spdk_iobuf_entry_stailq_t small_queue; 34 spdk_iobuf_entry_stailq_t large_queue; 35 struct spdk_iobuf_channel *channels[IOBUF_MAX_CHANNELS]; 36 }; 37 38 struct iobuf_module { 39 char *name; 40 TAILQ_ENTRY(iobuf_module) tailq; 41 }; 42 43 struct iobuf { 44 struct spdk_ring *small_pool; 45 struct spdk_ring *large_pool; 46 void *small_pool_base; 47 void *large_pool_base; 48 struct spdk_iobuf_opts opts; 49 TAILQ_HEAD(, iobuf_module) modules; 50 spdk_iobuf_finish_cb finish_cb; 51 void *finish_arg; 52 }; 53 54 static struct iobuf g_iobuf = { 55 .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), 56 .small_pool = NULL, 57 .large_pool = NULL, 58 .small_pool_base = NULL, 59 .large_pool_base = NULL, 60 .opts = { 61 .small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE, 62 .large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE, 63 .small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE, 64 .large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE, 65 }, 66 }; 67 68 struct iobuf_get_stats_ctx { 69 struct spdk_iobuf_module_stats *modules; 70 uint32_t num_modules; 71 spdk_iobuf_get_stats_cb cb_fn; 72 void *cb_arg; 73 }; 74 75 static int 76 iobuf_channel_create_cb(void *io_device, void *ctx) 77 { 78 struct iobuf_channel *ch = ctx; 79 80 STAILQ_INIT(&ch->small_queue); 81 STAILQ_INIT(&ch->large_queue); 82 83 return 0; 84 } 85 86 static void 87 iobuf_channel_destroy_cb(void *io_device, void *ctx) 88 { 89 struct iobuf_channel *ch __attribute__((unused)) = ctx; 90 91 assert(STAILQ_EMPTY(&ch->small_queue)); 92 assert(STAILQ_EMPTY(&ch->large_queue)); 93 } 94 95 int 96 spdk_iobuf_initialize(void) 97 { 98 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 99 int rc = 0; 100 uint64_t i; 101 struct spdk_iobuf_buffer *buf; 102 103 g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count, 104 SPDK_ENV_SOCKET_ID_ANY); 105 if (!g_iobuf.small_pool) { 106 SPDK_ERRLOG("Failed to create small iobuf pool\n"); 107 rc = -ENOMEM; 108 goto error; 109 } 110 111 /* Round up to the nearest alignment so that each element remains aligned */ 112 opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT); 113 g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT, 114 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 115 if (g_iobuf.small_pool_base == NULL) { 116 SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n"); 117 rc = -ENOMEM; 118 goto error; 119 } 120 121 g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count, 122 SPDK_ENV_SOCKET_ID_ANY); 123 if (!g_iobuf.large_pool) { 124 SPDK_ERRLOG("Failed to create large iobuf pool\n"); 125 rc = -ENOMEM; 126 goto error; 127 } 128 129 /* Round up to the nearest alignment so that each element remains aligned */ 130 opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT); 131 g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT, 132 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 133 if (g_iobuf.large_pool_base == NULL) { 134 SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n"); 135 rc = -ENOMEM; 136 goto error; 137 } 138 139 for (i = 0; i < opts->small_pool_count; i++) { 140 buf = g_iobuf.small_pool_base + i * opts->small_bufsize; 141 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 142 } 143 144 for (i = 0; i < opts->large_pool_count; i++) { 145 buf = g_iobuf.large_pool_base + i * opts->large_bufsize; 146 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 147 } 148 149 spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, 150 sizeof(struct iobuf_channel), "iobuf"); 151 g_iobuf_is_initialized = true; 152 153 return 0; 154 error: 155 spdk_free(g_iobuf.small_pool_base); 156 spdk_ring_free(g_iobuf.small_pool); 157 spdk_free(g_iobuf.large_pool_base); 158 spdk_ring_free(g_iobuf.large_pool); 159 160 return rc; 161 } 162 163 static void 164 iobuf_unregister_cb(void *io_device) 165 { 166 struct iobuf_module *module; 167 168 while (!TAILQ_EMPTY(&g_iobuf.modules)) { 169 module = TAILQ_FIRST(&g_iobuf.modules); 170 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 171 free(module->name); 172 free(module); 173 } 174 175 if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) { 176 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", 177 spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count); 178 } 179 180 if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) { 181 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", 182 spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count); 183 } 184 185 spdk_free(g_iobuf.small_pool_base); 186 g_iobuf.small_pool_base = NULL; 187 spdk_ring_free(g_iobuf.small_pool); 188 g_iobuf.small_pool = NULL; 189 190 spdk_free(g_iobuf.large_pool_base); 191 g_iobuf.large_pool_base = NULL; 192 spdk_ring_free(g_iobuf.large_pool); 193 g_iobuf.large_pool = NULL; 194 195 if (g_iobuf.finish_cb != NULL) { 196 g_iobuf.finish_cb(g_iobuf.finish_arg); 197 } 198 } 199 200 void 201 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) 202 { 203 if (!g_iobuf_is_initialized) { 204 cb_fn(cb_arg); 205 return; 206 } 207 208 g_iobuf_is_initialized = false; 209 g_iobuf.finish_cb = cb_fn; 210 g_iobuf.finish_arg = cb_arg; 211 212 spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); 213 } 214 215 int 216 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) 217 { 218 if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { 219 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", 220 IOBUF_MIN_SMALL_POOL_SIZE); 221 return -EINVAL; 222 } 223 if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { 224 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", 225 IOBUF_MIN_LARGE_POOL_SIZE); 226 return -EINVAL; 227 } 228 229 g_iobuf.opts = *opts; 230 231 if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { 232 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 ". Automatically increasing.\n", 233 IOBUF_MIN_SMALL_BUFSIZE); 234 g_iobuf.opts.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE; 235 } 236 237 if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { 238 SPDK_WARNLOG("large_bufsize must be at least %" PRIu32 ". Automatically increasing.\n", 239 IOBUF_MIN_LARGE_BUFSIZE); 240 g_iobuf.opts.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE; 241 } 242 243 return 0; 244 } 245 246 void 247 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts) 248 { 249 *opts = g_iobuf.opts; 250 } 251 252 int 253 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, 254 uint32_t small_cache_size, uint32_t large_cache_size) 255 { 256 struct spdk_io_channel *ioch; 257 struct iobuf_channel *iobuf_ch; 258 struct iobuf_module *module; 259 struct spdk_iobuf_buffer *buf; 260 uint32_t i; 261 262 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 263 if (strcmp(name, module->name) == 0) { 264 break; 265 } 266 } 267 268 if (module == NULL) { 269 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); 270 return -ENODEV; 271 } 272 273 ioch = spdk_get_io_channel(&g_iobuf); 274 if (ioch == NULL) { 275 SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); 276 return -ENOMEM; 277 } 278 279 iobuf_ch = spdk_io_channel_get_ctx(ioch); 280 281 for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) { 282 if (iobuf_ch->channels[i] == NULL) { 283 iobuf_ch->channels[i] = ch; 284 break; 285 } 286 } 287 288 if (i == IOBUF_MAX_CHANNELS) { 289 SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i); 290 goto error; 291 } 292 293 ch->small.queue = &iobuf_ch->small_queue; 294 ch->large.queue = &iobuf_ch->large_queue; 295 ch->small.pool = g_iobuf.small_pool; 296 ch->large.pool = g_iobuf.large_pool; 297 ch->small.bufsize = g_iobuf.opts.small_bufsize; 298 ch->large.bufsize = g_iobuf.opts.large_bufsize; 299 ch->parent = ioch; 300 ch->module = module; 301 ch->small.cache_size = small_cache_size; 302 ch->large.cache_size = large_cache_size; 303 ch->small.cache_count = 0; 304 ch->large.cache_count = 0; 305 306 STAILQ_INIT(&ch->small.cache); 307 STAILQ_INIT(&ch->large.cache); 308 309 for (i = 0; i < small_cache_size; ++i) { 310 if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) { 311 SPDK_ERRLOG("Failed to populate '%s' iobuf small buffer cache at %d/%d entries. " 312 "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n", 313 name, i, small_cache_size, g_iobuf.opts.small_pool_count); 314 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 315 "this value.\n"); 316 goto error; 317 } 318 STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq); 319 ch->small.cache_count++; 320 } 321 for (i = 0; i < large_cache_size; ++i) { 322 if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) { 323 SPDK_ERRLOG("Failed to populate '%s' iobuf large buffer cache at %d/%d entries. " 324 "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n", 325 name, i, large_cache_size, g_iobuf.opts.large_pool_count); 326 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 327 "this value.\n"); 328 goto error; 329 } 330 STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq); 331 ch->large.cache_count++; 332 } 333 334 return 0; 335 error: 336 spdk_iobuf_channel_fini(ch); 337 338 return -ENOMEM; 339 } 340 341 void 342 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) 343 { 344 struct spdk_iobuf_entry *entry __attribute__((unused)); 345 struct spdk_iobuf_buffer *buf; 346 struct iobuf_channel *iobuf_ch; 347 uint32_t i; 348 349 /* Make sure none of the wait queue entries are coming from this module */ 350 STAILQ_FOREACH(entry, ch->small.queue, stailq) { 351 assert(entry->module != ch->module); 352 } 353 STAILQ_FOREACH(entry, ch->large.queue, stailq) { 354 assert(entry->module != ch->module); 355 } 356 357 /* Release cached buffers back to the pool */ 358 while (!STAILQ_EMPTY(&ch->small.cache)) { 359 buf = STAILQ_FIRST(&ch->small.cache); 360 STAILQ_REMOVE_HEAD(&ch->small.cache, stailq); 361 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 362 ch->small.cache_count--; 363 } 364 while (!STAILQ_EMPTY(&ch->large.cache)) { 365 buf = STAILQ_FIRST(&ch->large.cache); 366 STAILQ_REMOVE_HEAD(&ch->large.cache, stailq); 367 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 368 ch->large.cache_count--; 369 } 370 371 assert(ch->small.cache_count == 0); 372 assert(ch->large.cache_count == 0); 373 374 iobuf_ch = spdk_io_channel_get_ctx(ch->parent); 375 for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) { 376 if (iobuf_ch->channels[i] == ch) { 377 iobuf_ch->channels[i] = NULL; 378 break; 379 } 380 } 381 382 spdk_put_io_channel(ch->parent); 383 ch->parent = NULL; 384 } 385 386 int 387 spdk_iobuf_register_module(const char *name) 388 { 389 struct iobuf_module *module; 390 391 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 392 if (strcmp(name, module->name) == 0) { 393 return -EEXIST; 394 } 395 } 396 397 module = calloc(1, sizeof(*module)); 398 if (module == NULL) { 399 return -ENOMEM; 400 } 401 402 module->name = strdup(name); 403 if (module->name == NULL) { 404 free(module); 405 return -ENOMEM; 406 } 407 408 TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); 409 410 return 0; 411 } 412 413 int 414 spdk_iobuf_unregister_module(const char *name) 415 { 416 struct iobuf_module *module; 417 418 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 419 if (strcmp(name, module->name) == 0) { 420 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 421 free(module->name); 422 free(module); 423 return 0; 424 } 425 } 426 427 return -ENOENT; 428 } 429 430 int 431 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, 432 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 433 { 434 struct spdk_iobuf_entry *entry, *tmp; 435 int rc; 436 437 STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { 438 /* We only want to iterate over the entries requested by the module which owns ch */ 439 if (entry->module != ch->module) { 440 continue; 441 } 442 443 rc = cb_fn(ch, entry, cb_ctx); 444 if (rc != 0) { 445 return rc; 446 } 447 } 448 449 return 0; 450 } 451 452 void 453 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, 454 uint64_t len) 455 { 456 struct spdk_iobuf_pool *pool; 457 458 if (len <= ch->small.bufsize) { 459 pool = &ch->small; 460 } else { 461 assert(len <= ch->large.bufsize); 462 pool = &ch->large; 463 } 464 465 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 466 } 467 468 #define IOBUF_BATCH_SIZE 32 469 470 void * 471 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, 472 struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn) 473 { 474 struct spdk_iobuf_pool *pool; 475 void *buf; 476 477 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 478 if (len <= ch->small.bufsize) { 479 pool = &ch->small; 480 } else { 481 assert(len <= ch->large.bufsize); 482 pool = &ch->large; 483 } 484 485 buf = (void *)STAILQ_FIRST(&pool->cache); 486 if (buf) { 487 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 488 assert(pool->cache_count > 0); 489 pool->cache_count--; 490 pool->stats.cache++; 491 } else { 492 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 493 size_t sz, i; 494 495 /* If we're going to dequeue, we may as well dequeue a batch. */ 496 sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE, 497 spdk_max(pool->cache_size, 1))); 498 if (sz == 0) { 499 if (entry) { 500 STAILQ_INSERT_TAIL(pool->queue, entry, stailq); 501 entry->module = ch->module; 502 entry->cb_fn = cb_fn; 503 pool->stats.retry++; 504 } 505 506 return NULL; 507 } 508 509 pool->stats.main++; 510 for (i = 0; i < (sz - 1); i++) { 511 STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq); 512 pool->cache_count++; 513 } 514 515 /* The last one is the one we'll return */ 516 buf = bufs[i]; 517 } 518 519 return (char *)buf; 520 } 521 522 void 523 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) 524 { 525 struct spdk_iobuf_entry *entry; 526 struct spdk_iobuf_buffer *iobuf_buf; 527 struct spdk_iobuf_pool *pool; 528 size_t sz; 529 530 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 531 if (len <= ch->small.bufsize) { 532 pool = &ch->small; 533 } else { 534 pool = &ch->large; 535 } 536 537 if (STAILQ_EMPTY(pool->queue)) { 538 if (pool->cache_size == 0) { 539 spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL); 540 return; 541 } 542 543 iobuf_buf = (struct spdk_iobuf_buffer *)buf; 544 545 STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq); 546 pool->cache_count++; 547 548 /* The cache size may exceed the configured amount. We always dequeue from the 549 * central pool in batches of known size, so wait until at least a batch 550 * has been returned to actually return the buffers to the central pool. */ 551 sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size); 552 if (pool->cache_count >= pool->cache_size + sz) { 553 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 554 size_t i; 555 556 for (i = 0; i < sz; i++) { 557 bufs[i] = STAILQ_FIRST(&pool->cache); 558 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 559 assert(pool->cache_count > 0); 560 pool->cache_count--; 561 } 562 563 spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL); 564 } 565 } else { 566 entry = STAILQ_FIRST(pool->queue); 567 STAILQ_REMOVE_HEAD(pool->queue, stailq); 568 entry->cb_fn(entry, buf); 569 } 570 } 571 572 static void 573 iobuf_get_channel_stats_done(struct spdk_io_channel_iter *iter, int status) 574 { 575 struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter); 576 577 ctx->cb_fn(ctx->modules, ctx->num_modules, ctx->cb_arg); 578 free(ctx->modules); 579 free(ctx); 580 } 581 582 static void 583 iobuf_get_channel_stats(struct spdk_io_channel_iter *iter) 584 { 585 struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter); 586 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(iter); 587 struct iobuf_channel *iobuf_ch = spdk_io_channel_get_ctx(ch); 588 struct spdk_iobuf_channel *channel; 589 struct iobuf_module *module; 590 struct spdk_iobuf_module_stats *it; 591 uint32_t i, j; 592 593 for (i = 0; i < ctx->num_modules; ++i) { 594 for (j = 0; j < IOBUF_MAX_CHANNELS; ++j) { 595 channel = iobuf_ch->channels[j]; 596 if (channel == NULL) { 597 continue; 598 } 599 600 it = &ctx->modules[i]; 601 module = (struct iobuf_module *)channel->module; 602 if (strcmp(it->module, module->name) == 0) { 603 it->small_pool.cache += channel->small.stats.cache; 604 it->small_pool.main += channel->small.stats.main; 605 it->small_pool.retry += channel->small.stats.retry; 606 it->large_pool.cache += channel->large.stats.cache; 607 it->large_pool.main += channel->large.stats.main; 608 it->large_pool.retry += channel->large.stats.retry; 609 break; 610 } 611 } 612 } 613 614 spdk_for_each_channel_continue(iter, 0); 615 } 616 617 int 618 spdk_iobuf_get_stats(spdk_iobuf_get_stats_cb cb_fn, void *cb_arg) 619 { 620 struct iobuf_module *module; 621 struct iobuf_get_stats_ctx *ctx; 622 uint32_t i; 623 624 ctx = calloc(1, sizeof(*ctx)); 625 if (ctx == NULL) { 626 return -ENOMEM; 627 } 628 629 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 630 ++ctx->num_modules; 631 } 632 633 ctx->modules = calloc(ctx->num_modules, sizeof(struct spdk_iobuf_module_stats)); 634 if (ctx->modules == NULL) { 635 free(ctx); 636 return -ENOMEM; 637 } 638 639 i = 0; 640 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 641 ctx->modules[i].module = module->name; 642 ++i; 643 } 644 645 ctx->cb_fn = cb_fn; 646 ctx->cb_arg = cb_arg; 647 648 spdk_for_each_channel(&g_iobuf, iobuf_get_channel_stats, ctx, 649 iobuf_get_channel_stats_done); 650 return 0; 651 } 652