1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2023 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/env.h" 7 #include "spdk/util.h" 8 #include "spdk/likely.h" 9 #include "spdk/log.h" 10 #include "spdk/thread.h" 11 12 #define IOBUF_MIN_SMALL_POOL_SIZE 64 13 #define IOBUF_MIN_LARGE_POOL_SIZE 8 14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE 8192 15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE 1024 16 #define IOBUF_ALIGNMENT 4096 17 #define IOBUF_MIN_SMALL_BUFSIZE 4096 18 #define IOBUF_MIN_LARGE_BUFSIZE 8192 19 #define IOBUF_DEFAULT_SMALL_BUFSIZE (8 * 1024) 20 /* 132k is a weird choice at first, but this needs to be large enough to accommodate 21 * the default maximum size (128k) plus metadata everywhere. For code paths that 22 * are explicitly configured, the math is instead done properly. This is only 23 * for the default. */ 24 #define IOBUF_DEFAULT_LARGE_BUFSIZE (132 * 1024) 25 #define IOBUF_MAX_CHANNELS 64 26 27 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE, 28 "Invalid data offset"); 29 30 static bool g_iobuf_is_initialized = false; 31 32 struct iobuf_channel { 33 spdk_iobuf_entry_stailq_t small_queue; 34 spdk_iobuf_entry_stailq_t large_queue; 35 struct spdk_iobuf_channel *channels[IOBUF_MAX_CHANNELS]; 36 }; 37 38 struct iobuf_module { 39 char *name; 40 TAILQ_ENTRY(iobuf_module) tailq; 41 }; 42 43 struct iobuf { 44 struct spdk_ring *small_pool; 45 struct spdk_ring *large_pool; 46 void *small_pool_base; 47 void *large_pool_base; 48 struct spdk_iobuf_opts opts; 49 TAILQ_HEAD(, iobuf_module) modules; 50 spdk_iobuf_finish_cb finish_cb; 51 void *finish_arg; 52 }; 53 54 static struct iobuf g_iobuf = { 55 .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), 56 .small_pool = NULL, 57 .large_pool = NULL, 58 .small_pool_base = NULL, 59 .large_pool_base = NULL, 60 .opts = { 61 .small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE, 62 .large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE, 63 .small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE, 64 .large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE, 65 }, 66 }; 67 68 struct iobuf_get_stats_ctx { 69 struct spdk_iobuf_module_stats *modules; 70 uint32_t num_modules; 71 spdk_iobuf_get_stats_cb cb_fn; 72 void *cb_arg; 73 }; 74 75 static int 76 iobuf_channel_create_cb(void *io_device, void *ctx) 77 { 78 struct iobuf_channel *ch = ctx; 79 80 STAILQ_INIT(&ch->small_queue); 81 STAILQ_INIT(&ch->large_queue); 82 83 return 0; 84 } 85 86 static void 87 iobuf_channel_destroy_cb(void *io_device, void *ctx) 88 { 89 struct iobuf_channel *ch __attribute__((unused)) = ctx; 90 91 assert(STAILQ_EMPTY(&ch->small_queue)); 92 assert(STAILQ_EMPTY(&ch->large_queue)); 93 } 94 95 int 96 spdk_iobuf_initialize(void) 97 { 98 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 99 int rc = 0; 100 uint64_t i; 101 struct spdk_iobuf_buffer *buf; 102 103 g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count, 104 SPDK_ENV_SOCKET_ID_ANY); 105 if (!g_iobuf.small_pool) { 106 SPDK_ERRLOG("Failed to create small iobuf pool\n"); 107 rc = -ENOMEM; 108 goto error; 109 } 110 111 /* Round up to the nearest alignment so that each element remains aligned */ 112 opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT); 113 g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT, 114 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 115 if (g_iobuf.small_pool_base == NULL) { 116 SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n"); 117 rc = -ENOMEM; 118 goto error; 119 } 120 121 g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count, 122 SPDK_ENV_SOCKET_ID_ANY); 123 if (!g_iobuf.large_pool) { 124 SPDK_ERRLOG("Failed to create large iobuf pool\n"); 125 rc = -ENOMEM; 126 goto error; 127 } 128 129 /* Round up to the nearest alignment so that each element remains aligned */ 130 opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT); 131 g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT, 132 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 133 if (g_iobuf.large_pool_base == NULL) { 134 SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n"); 135 rc = -ENOMEM; 136 goto error; 137 } 138 139 for (i = 0; i < opts->small_pool_count; i++) { 140 buf = g_iobuf.small_pool_base + i * opts->small_bufsize; 141 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 142 } 143 144 for (i = 0; i < opts->large_pool_count; i++) { 145 buf = g_iobuf.large_pool_base + i * opts->large_bufsize; 146 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 147 } 148 149 spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, 150 sizeof(struct iobuf_channel), "iobuf"); 151 g_iobuf_is_initialized = true; 152 153 return 0; 154 error: 155 spdk_free(g_iobuf.small_pool_base); 156 spdk_ring_free(g_iobuf.small_pool); 157 spdk_free(g_iobuf.large_pool_base); 158 spdk_ring_free(g_iobuf.large_pool); 159 160 return rc; 161 } 162 163 static void 164 iobuf_unregister_cb(void *io_device) 165 { 166 struct iobuf_module *module; 167 168 while (!TAILQ_EMPTY(&g_iobuf.modules)) { 169 module = TAILQ_FIRST(&g_iobuf.modules); 170 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 171 free(module->name); 172 free(module); 173 } 174 175 if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) { 176 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", 177 spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count); 178 } 179 180 if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) { 181 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", 182 spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count); 183 } 184 185 spdk_free(g_iobuf.small_pool_base); 186 g_iobuf.small_pool_base = NULL; 187 spdk_ring_free(g_iobuf.small_pool); 188 g_iobuf.small_pool = NULL; 189 190 spdk_free(g_iobuf.large_pool_base); 191 g_iobuf.large_pool_base = NULL; 192 spdk_ring_free(g_iobuf.large_pool); 193 g_iobuf.large_pool = NULL; 194 195 if (g_iobuf.finish_cb != NULL) { 196 g_iobuf.finish_cb(g_iobuf.finish_arg); 197 } 198 } 199 200 void 201 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) 202 { 203 if (!g_iobuf_is_initialized) { 204 cb_fn(cb_arg); 205 return; 206 } 207 208 g_iobuf_is_initialized = false; 209 g_iobuf.finish_cb = cb_fn; 210 g_iobuf.finish_arg = cb_arg; 211 212 spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); 213 } 214 215 int 216 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) 217 { 218 if (!opts) { 219 SPDK_ERRLOG("opts cannot be NULL\n"); 220 return -1; 221 } 222 223 if (!opts->opts_size) { 224 SPDK_ERRLOG("opts_size inside opts cannot be zero value\n"); 225 return -1; 226 } 227 228 if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { 229 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", 230 IOBUF_MIN_SMALL_POOL_SIZE); 231 return -EINVAL; 232 } 233 if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { 234 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", 235 IOBUF_MIN_LARGE_POOL_SIZE); 236 return -EINVAL; 237 } 238 239 if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { 240 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n", 241 IOBUF_MIN_SMALL_BUFSIZE); 242 return -EINVAL; 243 } 244 245 if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { 246 SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n", 247 IOBUF_MIN_LARGE_BUFSIZE); 248 return -EINVAL; 249 } 250 251 #define SET_FIELD(field) \ 252 if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts->opts_size) { \ 253 g_iobuf.opts.field = opts->field; \ 254 } \ 255 256 SET_FIELD(small_pool_count); 257 SET_FIELD(large_pool_count); 258 SET_FIELD(small_bufsize); 259 SET_FIELD(large_bufsize); 260 261 g_iobuf.opts.opts_size = opts->opts_size; 262 263 #undef SET_FIELD 264 265 return 0; 266 } 267 268 void 269 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts, size_t opts_size) 270 { 271 if (!opts) { 272 SPDK_ERRLOG("opts should not be NULL\n"); 273 return; 274 } 275 276 if (!opts_size) { 277 SPDK_ERRLOG("opts_size should not be zero value\n"); 278 return; 279 } 280 281 opts->opts_size = opts_size; 282 283 #define SET_FIELD(field) \ 284 if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts_size) { \ 285 opts->field = g_iobuf.opts.field; \ 286 } \ 287 288 SET_FIELD(small_pool_count); 289 SET_FIELD(large_pool_count); 290 SET_FIELD(small_bufsize); 291 SET_FIELD(large_bufsize); 292 293 #undef SET_FIELD 294 295 /* Do not remove this statement, you should always update this statement when you adding a new field, 296 * and do not forget to add the SET_FIELD statement for your added field. */ 297 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_opts) == 32, "Incorrect size"); 298 } 299 300 301 int 302 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, 303 uint32_t small_cache_size, uint32_t large_cache_size) 304 { 305 struct spdk_io_channel *ioch; 306 struct iobuf_channel *iobuf_ch; 307 struct iobuf_module *module; 308 struct spdk_iobuf_buffer *buf; 309 uint32_t i; 310 311 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 312 if (strcmp(name, module->name) == 0) { 313 break; 314 } 315 } 316 317 if (module == NULL) { 318 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); 319 return -ENODEV; 320 } 321 322 ioch = spdk_get_io_channel(&g_iobuf); 323 if (ioch == NULL) { 324 SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); 325 return -ENOMEM; 326 } 327 328 iobuf_ch = spdk_io_channel_get_ctx(ioch); 329 330 for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) { 331 if (iobuf_ch->channels[i] == NULL) { 332 iobuf_ch->channels[i] = ch; 333 break; 334 } 335 } 336 337 if (i == IOBUF_MAX_CHANNELS) { 338 SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i); 339 goto error; 340 } 341 342 ch->small.queue = &iobuf_ch->small_queue; 343 ch->large.queue = &iobuf_ch->large_queue; 344 ch->small.pool = g_iobuf.small_pool; 345 ch->large.pool = g_iobuf.large_pool; 346 ch->small.bufsize = g_iobuf.opts.small_bufsize; 347 ch->large.bufsize = g_iobuf.opts.large_bufsize; 348 ch->parent = ioch; 349 ch->module = module; 350 ch->small.cache_size = small_cache_size; 351 ch->large.cache_size = large_cache_size; 352 ch->small.cache_count = 0; 353 ch->large.cache_count = 0; 354 355 STAILQ_INIT(&ch->small.cache); 356 STAILQ_INIT(&ch->large.cache); 357 358 for (i = 0; i < small_cache_size; ++i) { 359 if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) { 360 SPDK_ERRLOG("Failed to populate '%s' iobuf small buffer cache at %d/%d entries. " 361 "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n", 362 name, i, small_cache_size, g_iobuf.opts.small_pool_count); 363 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 364 "this value.\n"); 365 goto error; 366 } 367 STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq); 368 ch->small.cache_count++; 369 } 370 for (i = 0; i < large_cache_size; ++i) { 371 if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) { 372 SPDK_ERRLOG("Failed to populate '%s' iobuf large buffer cache at %d/%d entries. " 373 "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n", 374 name, i, large_cache_size, g_iobuf.opts.large_pool_count); 375 SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate " 376 "this value.\n"); 377 goto error; 378 } 379 STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq); 380 ch->large.cache_count++; 381 } 382 383 return 0; 384 error: 385 spdk_iobuf_channel_fini(ch); 386 387 return -ENOMEM; 388 } 389 390 void 391 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) 392 { 393 struct spdk_iobuf_entry *entry __attribute__((unused)); 394 struct spdk_iobuf_buffer *buf; 395 struct iobuf_channel *iobuf_ch; 396 uint32_t i; 397 398 /* Make sure none of the wait queue entries are coming from this module */ 399 STAILQ_FOREACH(entry, ch->small.queue, stailq) { 400 assert(entry->module != ch->module); 401 } 402 STAILQ_FOREACH(entry, ch->large.queue, stailq) { 403 assert(entry->module != ch->module); 404 } 405 406 /* Release cached buffers back to the pool */ 407 while (!STAILQ_EMPTY(&ch->small.cache)) { 408 buf = STAILQ_FIRST(&ch->small.cache); 409 STAILQ_REMOVE_HEAD(&ch->small.cache, stailq); 410 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 411 ch->small.cache_count--; 412 } 413 while (!STAILQ_EMPTY(&ch->large.cache)) { 414 buf = STAILQ_FIRST(&ch->large.cache); 415 STAILQ_REMOVE_HEAD(&ch->large.cache, stailq); 416 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 417 ch->large.cache_count--; 418 } 419 420 assert(ch->small.cache_count == 0); 421 assert(ch->large.cache_count == 0); 422 423 iobuf_ch = spdk_io_channel_get_ctx(ch->parent); 424 for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) { 425 if (iobuf_ch->channels[i] == ch) { 426 iobuf_ch->channels[i] = NULL; 427 break; 428 } 429 } 430 431 spdk_put_io_channel(ch->parent); 432 ch->parent = NULL; 433 } 434 435 int 436 spdk_iobuf_register_module(const char *name) 437 { 438 struct iobuf_module *module; 439 440 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 441 if (strcmp(name, module->name) == 0) { 442 return -EEXIST; 443 } 444 } 445 446 module = calloc(1, sizeof(*module)); 447 if (module == NULL) { 448 return -ENOMEM; 449 } 450 451 module->name = strdup(name); 452 if (module->name == NULL) { 453 free(module); 454 return -ENOMEM; 455 } 456 457 TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); 458 459 return 0; 460 } 461 462 int 463 spdk_iobuf_unregister_module(const char *name) 464 { 465 struct iobuf_module *module; 466 467 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 468 if (strcmp(name, module->name) == 0) { 469 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 470 free(module->name); 471 free(module); 472 return 0; 473 } 474 } 475 476 return -ENOENT; 477 } 478 479 int 480 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, 481 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 482 { 483 struct spdk_iobuf_entry *entry, *tmp; 484 int rc; 485 486 STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { 487 /* We only want to iterate over the entries requested by the module which owns ch */ 488 if (entry->module != ch->module) { 489 continue; 490 } 491 492 rc = cb_fn(ch, entry, cb_ctx); 493 if (rc != 0) { 494 return rc; 495 } 496 } 497 498 return 0; 499 } 500 501 void 502 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, 503 uint64_t len) 504 { 505 struct spdk_iobuf_pool *pool; 506 507 if (len <= ch->small.bufsize) { 508 pool = &ch->small; 509 } else { 510 assert(len <= ch->large.bufsize); 511 pool = &ch->large; 512 } 513 514 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 515 } 516 517 #define IOBUF_BATCH_SIZE 32 518 519 void * 520 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, 521 struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn) 522 { 523 struct spdk_iobuf_pool *pool; 524 void *buf; 525 526 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 527 if (len <= ch->small.bufsize) { 528 pool = &ch->small; 529 } else { 530 assert(len <= ch->large.bufsize); 531 pool = &ch->large; 532 } 533 534 buf = (void *)STAILQ_FIRST(&pool->cache); 535 if (buf) { 536 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 537 assert(pool->cache_count > 0); 538 pool->cache_count--; 539 pool->stats.cache++; 540 } else { 541 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 542 size_t sz, i; 543 544 /* If we're going to dequeue, we may as well dequeue a batch. */ 545 sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE, 546 spdk_max(pool->cache_size, 1))); 547 if (sz == 0) { 548 if (entry) { 549 STAILQ_INSERT_TAIL(pool->queue, entry, stailq); 550 entry->module = ch->module; 551 entry->cb_fn = cb_fn; 552 pool->stats.retry++; 553 } 554 555 return NULL; 556 } 557 558 pool->stats.main++; 559 for (i = 0; i < (sz - 1); i++) { 560 STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq); 561 pool->cache_count++; 562 } 563 564 /* The last one is the one we'll return */ 565 buf = bufs[i]; 566 } 567 568 return (char *)buf; 569 } 570 571 void 572 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) 573 { 574 struct spdk_iobuf_entry *entry; 575 struct spdk_iobuf_buffer *iobuf_buf; 576 struct spdk_iobuf_pool *pool; 577 size_t sz; 578 579 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 580 if (len <= ch->small.bufsize) { 581 pool = &ch->small; 582 } else { 583 pool = &ch->large; 584 } 585 586 if (STAILQ_EMPTY(pool->queue)) { 587 if (pool->cache_size == 0) { 588 spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL); 589 return; 590 } 591 592 iobuf_buf = (struct spdk_iobuf_buffer *)buf; 593 594 STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq); 595 pool->cache_count++; 596 597 /* The cache size may exceed the configured amount. We always dequeue from the 598 * central pool in batches of known size, so wait until at least a batch 599 * has been returned to actually return the buffers to the central pool. */ 600 sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size); 601 if (pool->cache_count >= pool->cache_size + sz) { 602 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 603 size_t i; 604 605 for (i = 0; i < sz; i++) { 606 bufs[i] = STAILQ_FIRST(&pool->cache); 607 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 608 assert(pool->cache_count > 0); 609 pool->cache_count--; 610 } 611 612 spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL); 613 } 614 } else { 615 entry = STAILQ_FIRST(pool->queue); 616 STAILQ_REMOVE_HEAD(pool->queue, stailq); 617 entry->cb_fn(entry, buf); 618 if (spdk_unlikely(entry == STAILQ_LAST(pool->queue, spdk_iobuf_entry, stailq))) { 619 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 620 STAILQ_INSERT_HEAD(pool->queue, entry, stailq); 621 } 622 } 623 } 624 625 static void 626 iobuf_get_channel_stats_done(struct spdk_io_channel_iter *iter, int status) 627 { 628 struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter); 629 630 ctx->cb_fn(ctx->modules, ctx->num_modules, ctx->cb_arg); 631 free(ctx->modules); 632 free(ctx); 633 } 634 635 static void 636 iobuf_get_channel_stats(struct spdk_io_channel_iter *iter) 637 { 638 struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter); 639 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(iter); 640 struct iobuf_channel *iobuf_ch = spdk_io_channel_get_ctx(ch); 641 struct spdk_iobuf_channel *channel; 642 struct iobuf_module *module; 643 struct spdk_iobuf_module_stats *it; 644 uint32_t i, j; 645 646 for (i = 0; i < ctx->num_modules; ++i) { 647 for (j = 0; j < IOBUF_MAX_CHANNELS; ++j) { 648 channel = iobuf_ch->channels[j]; 649 if (channel == NULL) { 650 continue; 651 } 652 653 it = &ctx->modules[i]; 654 module = (struct iobuf_module *)channel->module; 655 if (strcmp(it->module, module->name) == 0) { 656 it->small_pool.cache += channel->small.stats.cache; 657 it->small_pool.main += channel->small.stats.main; 658 it->small_pool.retry += channel->small.stats.retry; 659 it->large_pool.cache += channel->large.stats.cache; 660 it->large_pool.main += channel->large.stats.main; 661 it->large_pool.retry += channel->large.stats.retry; 662 break; 663 } 664 } 665 } 666 667 spdk_for_each_channel_continue(iter, 0); 668 } 669 670 int 671 spdk_iobuf_get_stats(spdk_iobuf_get_stats_cb cb_fn, void *cb_arg) 672 { 673 struct iobuf_module *module; 674 struct iobuf_get_stats_ctx *ctx; 675 uint32_t i; 676 677 ctx = calloc(1, sizeof(*ctx)); 678 if (ctx == NULL) { 679 return -ENOMEM; 680 } 681 682 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 683 ++ctx->num_modules; 684 } 685 686 ctx->modules = calloc(ctx->num_modules, sizeof(struct spdk_iobuf_module_stats)); 687 if (ctx->modules == NULL) { 688 free(ctx); 689 return -ENOMEM; 690 } 691 692 i = 0; 693 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 694 ctx->modules[i].module = module->name; 695 ++i; 696 } 697 698 ctx->cb_fn = cb_fn; 699 ctx->cb_arg = cb_arg; 700 701 spdk_for_each_channel(&g_iobuf, iobuf_get_channel_stats, ctx, 702 iobuf_get_channel_stats_done); 703 return 0; 704 } 705