1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2023 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/env.h" 7 #include "spdk/util.h" 8 #include "spdk/likely.h" 9 #include "spdk/log.h" 10 #include "spdk/thread.h" 11 12 #define IOBUF_MIN_SMALL_POOL_SIZE 64 13 #define IOBUF_MIN_LARGE_POOL_SIZE 8 14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE 8192 15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE 1024 16 #define IOBUF_ALIGNMENT 4096 17 #define IOBUF_MIN_SMALL_BUFSIZE 4096 18 #define IOBUF_MIN_LARGE_BUFSIZE 8192 19 #define IOBUF_DEFAULT_SMALL_BUFSIZE (8 * 1024) 20 /* 132k is a weird choice at first, but this needs to be large enough to accomodate 21 * the default maximum size (128k) plus metadata everywhere. For code paths that 22 * are explicitly configured, the math is instead done properly. This is only 23 * for the default. */ 24 #define IOBUF_DEFAULT_LARGE_BUFSIZE (132 * 1024) 25 #define IOBUF_MAX_CHANNELS 64 26 27 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE, 28 "Invalid data offset"); 29 30 struct iobuf_channel { 31 spdk_iobuf_entry_stailq_t small_queue; 32 spdk_iobuf_entry_stailq_t large_queue; 33 struct spdk_iobuf_channel *channels[IOBUF_MAX_CHANNELS]; 34 }; 35 36 struct iobuf_module { 37 char *name; 38 TAILQ_ENTRY(iobuf_module) tailq; 39 }; 40 41 struct iobuf { 42 struct spdk_ring *small_pool; 43 struct spdk_ring *large_pool; 44 void *small_pool_base; 45 void *large_pool_base; 46 struct spdk_iobuf_opts opts; 47 TAILQ_HEAD(, iobuf_module) modules; 48 spdk_iobuf_finish_cb finish_cb; 49 void *finish_arg; 50 }; 51 52 static struct iobuf g_iobuf = { 53 .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), 54 .small_pool = NULL, 55 .large_pool = NULL, 56 .small_pool_base = NULL, 57 .large_pool_base = NULL, 58 .opts = { 59 .small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE, 60 .large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE, 61 .small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE, 62 .large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE, 63 }, 64 }; 65 66 struct iobuf_get_stats_ctx { 67 struct spdk_iobuf_module_stats *modules; 68 uint32_t num_modules; 69 spdk_iobuf_get_stats_cb cb_fn; 70 void *cb_arg; 71 }; 72 73 static int 74 iobuf_channel_create_cb(void *io_device, void *ctx) 75 { 76 struct iobuf_channel *ch = ctx; 77 78 STAILQ_INIT(&ch->small_queue); 79 STAILQ_INIT(&ch->large_queue); 80 81 return 0; 82 } 83 84 static void 85 iobuf_channel_destroy_cb(void *io_device, void *ctx) 86 { 87 struct iobuf_channel *ch __attribute__((unused)) = ctx; 88 89 assert(STAILQ_EMPTY(&ch->small_queue)); 90 assert(STAILQ_EMPTY(&ch->large_queue)); 91 } 92 93 int 94 spdk_iobuf_initialize(void) 95 { 96 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 97 int rc = 0; 98 uint64_t i; 99 struct spdk_iobuf_buffer *buf; 100 101 g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count, 102 SPDK_ENV_SOCKET_ID_ANY); 103 if (!g_iobuf.small_pool) { 104 SPDK_ERRLOG("Failed to create small iobuf pool\n"); 105 rc = -ENOMEM; 106 goto error; 107 } 108 109 /* Round up to the nearest alignment so that each element remains aligned */ 110 opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT); 111 g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT, 112 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 113 if (g_iobuf.small_pool_base == NULL) { 114 SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n"); 115 rc = -ENOMEM; 116 goto error; 117 } 118 119 g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count, 120 SPDK_ENV_SOCKET_ID_ANY); 121 if (!g_iobuf.large_pool) { 122 SPDK_ERRLOG("Failed to create large iobuf pool\n"); 123 rc = -ENOMEM; 124 goto error; 125 } 126 127 /* Round up to the nearest alignment so that each element remains aligned */ 128 opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT); 129 g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT, 130 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 131 if (g_iobuf.large_pool_base == NULL) { 132 SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n"); 133 rc = -ENOMEM; 134 goto error; 135 } 136 137 for (i = 0; i < opts->small_pool_count; i++) { 138 buf = g_iobuf.small_pool_base + i * opts->small_bufsize; 139 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 140 } 141 142 for (i = 0; i < opts->large_pool_count; i++) { 143 buf = g_iobuf.large_pool_base + i * opts->large_bufsize; 144 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 145 } 146 147 spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, 148 sizeof(struct iobuf_channel), "iobuf"); 149 150 return 0; 151 error: 152 spdk_free(g_iobuf.small_pool_base); 153 spdk_ring_free(g_iobuf.small_pool); 154 spdk_free(g_iobuf.large_pool_base); 155 spdk_ring_free(g_iobuf.large_pool); 156 157 return rc; 158 } 159 160 static void 161 iobuf_unregister_cb(void *io_device) 162 { 163 struct iobuf_module *module; 164 165 while (!TAILQ_EMPTY(&g_iobuf.modules)) { 166 module = TAILQ_FIRST(&g_iobuf.modules); 167 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 168 free(module->name); 169 free(module); 170 } 171 172 if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) { 173 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", 174 spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count); 175 } 176 177 if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) { 178 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", 179 spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count); 180 } 181 182 spdk_free(g_iobuf.small_pool_base); 183 g_iobuf.small_pool_base = NULL; 184 spdk_ring_free(g_iobuf.small_pool); 185 g_iobuf.small_pool = NULL; 186 187 spdk_free(g_iobuf.large_pool_base); 188 g_iobuf.large_pool_base = NULL; 189 spdk_ring_free(g_iobuf.large_pool); 190 g_iobuf.large_pool = NULL; 191 192 if (g_iobuf.finish_cb != NULL) { 193 g_iobuf.finish_cb(g_iobuf.finish_arg); 194 } 195 } 196 197 void 198 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) 199 { 200 g_iobuf.finish_cb = cb_fn; 201 g_iobuf.finish_arg = cb_arg; 202 203 spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); 204 } 205 206 int 207 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) 208 { 209 if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { 210 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", 211 IOBUF_MIN_SMALL_POOL_SIZE); 212 return -EINVAL; 213 } 214 if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { 215 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", 216 IOBUF_MIN_LARGE_POOL_SIZE); 217 return -EINVAL; 218 } 219 220 g_iobuf.opts = *opts; 221 222 if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { 223 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 ". Automatically increasing.\n", 224 IOBUF_MIN_SMALL_BUFSIZE); 225 g_iobuf.opts.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE; 226 } 227 228 if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { 229 SPDK_WARNLOG("large_bufsize must be at least %" PRIu32 ". Automatically increasing.\n", 230 IOBUF_MIN_LARGE_BUFSIZE); 231 g_iobuf.opts.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE; 232 } 233 234 return 0; 235 } 236 237 void 238 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts) 239 { 240 *opts = g_iobuf.opts; 241 } 242 243 int 244 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, 245 uint32_t small_cache_size, uint32_t large_cache_size) 246 { 247 struct spdk_io_channel *ioch; 248 struct iobuf_channel *iobuf_ch; 249 struct iobuf_module *module; 250 struct spdk_iobuf_buffer *buf; 251 uint32_t i; 252 253 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 254 if (strcmp(name, module->name) == 0) { 255 break; 256 } 257 } 258 259 if (module == NULL) { 260 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); 261 return -ENODEV; 262 } 263 264 ioch = spdk_get_io_channel(&g_iobuf); 265 if (ioch == NULL) { 266 SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); 267 return -ENOMEM; 268 } 269 270 iobuf_ch = spdk_io_channel_get_ctx(ioch); 271 272 for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) { 273 if (iobuf_ch->channels[i] == NULL) { 274 iobuf_ch->channels[i] = ch; 275 break; 276 } 277 } 278 279 if (i == IOBUF_MAX_CHANNELS) { 280 SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i); 281 goto error; 282 } 283 284 ch->small.queue = &iobuf_ch->small_queue; 285 ch->large.queue = &iobuf_ch->large_queue; 286 ch->small.pool = g_iobuf.small_pool; 287 ch->large.pool = g_iobuf.large_pool; 288 ch->small.bufsize = g_iobuf.opts.small_bufsize; 289 ch->large.bufsize = g_iobuf.opts.large_bufsize; 290 ch->parent = ioch; 291 ch->module = module; 292 ch->small.cache_size = small_cache_size; 293 ch->large.cache_size = large_cache_size; 294 ch->small.cache_count = 0; 295 ch->large.cache_count = 0; 296 297 STAILQ_INIT(&ch->small.cache); 298 STAILQ_INIT(&ch->large.cache); 299 300 for (i = 0; i < small_cache_size; ++i) { 301 if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) { 302 SPDK_ERRLOG("Failed to populate iobuf small buffer cache. " 303 "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n", 304 g_iobuf.opts.small_pool_count); 305 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 306 "this value.\n"); 307 goto error; 308 } 309 STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq); 310 ch->small.cache_count++; 311 } 312 for (i = 0; i < large_cache_size; ++i) { 313 if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) { 314 SPDK_ERRLOG("Failed to populate iobuf large buffer cache. " 315 "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n", 316 g_iobuf.opts.large_pool_count); 317 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 318 "this value.\n"); 319 goto error; 320 } 321 STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq); 322 ch->large.cache_count++; 323 } 324 325 return 0; 326 error: 327 spdk_iobuf_channel_fini(ch); 328 329 return -ENOMEM; 330 } 331 332 void 333 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) 334 { 335 struct spdk_iobuf_entry *entry __attribute__((unused)); 336 struct spdk_iobuf_buffer *buf; 337 struct iobuf_channel *iobuf_ch; 338 uint32_t i; 339 340 /* Make sure none of the wait queue entries are coming from this module */ 341 STAILQ_FOREACH(entry, ch->small.queue, stailq) { 342 assert(entry->module != ch->module); 343 } 344 STAILQ_FOREACH(entry, ch->large.queue, stailq) { 345 assert(entry->module != ch->module); 346 } 347 348 /* Release cached buffers back to the pool */ 349 while (!STAILQ_EMPTY(&ch->small.cache)) { 350 buf = STAILQ_FIRST(&ch->small.cache); 351 STAILQ_REMOVE_HEAD(&ch->small.cache, stailq); 352 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 353 ch->small.cache_count--; 354 } 355 while (!STAILQ_EMPTY(&ch->large.cache)) { 356 buf = STAILQ_FIRST(&ch->large.cache); 357 STAILQ_REMOVE_HEAD(&ch->large.cache, stailq); 358 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 359 ch->large.cache_count--; 360 } 361 362 assert(ch->small.cache_count == 0); 363 assert(ch->large.cache_count == 0); 364 365 iobuf_ch = spdk_io_channel_get_ctx(ch->parent); 366 for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) { 367 if (iobuf_ch->channels[i] == ch) { 368 iobuf_ch->channels[i] = NULL; 369 break; 370 } 371 } 372 373 spdk_put_io_channel(ch->parent); 374 ch->parent = NULL; 375 } 376 377 int 378 spdk_iobuf_register_module(const char *name) 379 { 380 struct iobuf_module *module; 381 382 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 383 if (strcmp(name, module->name) == 0) { 384 return -EEXIST; 385 } 386 } 387 388 module = calloc(1, sizeof(*module)); 389 if (module == NULL) { 390 return -ENOMEM; 391 } 392 393 module->name = strdup(name); 394 if (module->name == NULL) { 395 free(module); 396 return -ENOMEM; 397 } 398 399 TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); 400 401 return 0; 402 } 403 404 int 405 spdk_iobuf_unregister_module(const char *name) 406 { 407 struct iobuf_module *module; 408 409 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 410 if (strcmp(name, module->name) == 0) { 411 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 412 free(module->name); 413 free(module); 414 return 0; 415 } 416 } 417 418 return -ENOENT; 419 } 420 421 int 422 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, 423 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 424 { 425 struct spdk_iobuf_entry *entry, *tmp; 426 int rc; 427 428 STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { 429 /* We only want to iterate over the entries requested by the module which owns ch */ 430 if (entry->module != ch->module) { 431 continue; 432 } 433 434 rc = cb_fn(ch, entry, cb_ctx); 435 if (rc != 0) { 436 return rc; 437 } 438 } 439 440 return 0; 441 } 442 443 void 444 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, 445 uint64_t len) 446 { 447 struct spdk_iobuf_pool *pool; 448 449 if (len <= ch->small.bufsize) { 450 pool = &ch->small; 451 } else { 452 assert(len <= ch->large.bufsize); 453 pool = &ch->large; 454 } 455 456 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 457 } 458 459 #define IOBUF_BATCH_SIZE 32 460 461 void * 462 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, 463 struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn) 464 { 465 struct spdk_iobuf_pool *pool; 466 void *buf; 467 468 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 469 if (len <= ch->small.bufsize) { 470 pool = &ch->small; 471 } else { 472 assert(len <= ch->large.bufsize); 473 pool = &ch->large; 474 } 475 476 buf = (void *)STAILQ_FIRST(&pool->cache); 477 if (buf) { 478 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 479 assert(pool->cache_count > 0); 480 pool->cache_count--; 481 pool->stats.cache++; 482 } else { 483 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 484 size_t sz, i; 485 486 /* If we're going to dequeue, we may as well dequeue a batch. */ 487 sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE, 488 spdk_max(pool->cache_size, 1))); 489 if (sz == 0) { 490 if (entry) { 491 STAILQ_INSERT_TAIL(pool->queue, entry, stailq); 492 entry->module = ch->module; 493 entry->cb_fn = cb_fn; 494 pool->stats.retry++; 495 } 496 497 return NULL; 498 } 499 500 pool->stats.main++; 501 for (i = 0; i < (sz - 1); i++) { 502 STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq); 503 pool->cache_count++; 504 } 505 506 /* The last one is the one we'll return */ 507 buf = bufs[i]; 508 } 509 510 return (char *)buf; 511 } 512 513 void 514 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) 515 { 516 struct spdk_iobuf_entry *entry; 517 struct spdk_iobuf_buffer *iobuf_buf; 518 struct spdk_iobuf_pool *pool; 519 size_t sz; 520 521 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 522 if (len <= ch->small.bufsize) { 523 pool = &ch->small; 524 } else { 525 pool = &ch->large; 526 } 527 528 if (STAILQ_EMPTY(pool->queue)) { 529 if (pool->cache_size == 0) { 530 spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL); 531 return; 532 } 533 534 iobuf_buf = (struct spdk_iobuf_buffer *)buf; 535 536 STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq); 537 pool->cache_count++; 538 539 /* The cache size may exceed the configured amount. We always dequeue from the 540 * central pool in batches of known size, so wait until at least a batch 541 * has been returned to actually return the buffers to the central pool. */ 542 sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size); 543 if (pool->cache_count >= pool->cache_size + sz) { 544 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 545 size_t i; 546 547 for (i = 0; i < sz; i++) { 548 bufs[i] = STAILQ_FIRST(&pool->cache); 549 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 550 assert(pool->cache_count > 0); 551 pool->cache_count--; 552 } 553 554 spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL); 555 } 556 } else { 557 entry = STAILQ_FIRST(pool->queue); 558 STAILQ_REMOVE_HEAD(pool->queue, stailq); 559 entry->cb_fn(entry, buf); 560 } 561 } 562 563 static void 564 iobuf_get_channel_stats_done(struct spdk_io_channel_iter *iter, int status) 565 { 566 struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter); 567 568 ctx->cb_fn(ctx->modules, ctx->num_modules, ctx->cb_arg); 569 free(ctx->modules); 570 free(ctx); 571 } 572 573 static void 574 iobuf_get_channel_stats(struct spdk_io_channel_iter *iter) 575 { 576 struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter); 577 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(iter); 578 struct iobuf_channel *iobuf_ch = spdk_io_channel_get_ctx(ch); 579 struct spdk_iobuf_channel *channel; 580 struct iobuf_module *module; 581 struct spdk_iobuf_module_stats *it; 582 uint32_t i, j; 583 584 for (i = 0; i < ctx->num_modules; ++i) { 585 for (j = 0; j < IOBUF_MAX_CHANNELS; ++j) { 586 channel = iobuf_ch->channels[j]; 587 if (channel == NULL) { 588 continue; 589 } 590 591 it = &ctx->modules[i]; 592 module = (struct iobuf_module *)channel->module; 593 if (strcmp(it->module, module->name) == 0) { 594 it->small_pool.cache += channel->small.stats.cache; 595 it->small_pool.main += channel->small.stats.main; 596 it->small_pool.retry += channel->small.stats.retry; 597 it->large_pool.cache += channel->large.stats.cache; 598 it->large_pool.main += channel->large.stats.main; 599 it->large_pool.retry += channel->large.stats.retry; 600 break; 601 } 602 } 603 } 604 605 spdk_for_each_channel_continue(iter, 0); 606 } 607 608 int 609 spdk_iobuf_get_stats(spdk_iobuf_get_stats_cb cb_fn, void *cb_arg) 610 { 611 struct iobuf_module *module; 612 struct iobuf_get_stats_ctx *ctx; 613 uint32_t i; 614 615 ctx = calloc(1, sizeof(*ctx)); 616 if (ctx == NULL) { 617 return -ENOMEM; 618 } 619 620 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 621 ++ctx->num_modules; 622 } 623 624 ctx->modules = calloc(ctx->num_modules, sizeof(struct spdk_iobuf_module_stats)); 625 if (ctx->modules == NULL) { 626 free(ctx); 627 return -ENOMEM; 628 } 629 630 i = 0; 631 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 632 ctx->modules[i].module = module->name; 633 ++i; 634 } 635 636 ctx->cb_fn = cb_fn; 637 ctx->cb_arg = cb_arg; 638 639 spdk_for_each_channel(&g_iobuf, iobuf_get_channel_stats, ctx, 640 iobuf_get_channel_stats_done); 641 return 0; 642 } 643