xref: /spdk/lib/thread/iobuf.c (revision f8abbede89d30584d2a4f8427b13896f8591b873)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2023 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/env.h"
7 #include "spdk/util.h"
8 #include "spdk/likely.h"
9 #include "spdk/log.h"
10 #include "spdk/thread.h"
11 
12 #define IOBUF_MIN_SMALL_POOL_SIZE	64
13 #define IOBUF_MIN_LARGE_POOL_SIZE	8
14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE	8192
15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE	1024
16 #define IOBUF_ALIGNMENT			4096
17 #define IOBUF_MIN_SMALL_BUFSIZE		4096
18 #define IOBUF_MIN_LARGE_BUFSIZE		8192
19 #define IOBUF_DEFAULT_SMALL_BUFSIZE	(8 * 1024)
20 /* 132k is a weird choice at first, but this needs to be large enough to accomodate
21  * the default maximum size (128k) plus metadata everywhere. For code paths that
22  * are explicitly configured, the math is instead done properly. This is only
23  * for the default. */
24 #define IOBUF_DEFAULT_LARGE_BUFSIZE	(132 * 1024)
25 #define IOBUF_MAX_CHANNELS		64
26 
27 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE,
28 		   "Invalid data offset");
29 
30 struct iobuf_channel {
31 	spdk_iobuf_entry_stailq_t	small_queue;
32 	spdk_iobuf_entry_stailq_t	large_queue;
33 	struct spdk_iobuf_channel	*channels[IOBUF_MAX_CHANNELS];
34 };
35 
36 struct iobuf_module {
37 	char				*name;
38 	TAILQ_ENTRY(iobuf_module)	tailq;
39 };
40 
41 struct iobuf {
42 	struct spdk_ring		*small_pool;
43 	struct spdk_ring		*large_pool;
44 	void				*small_pool_base;
45 	void				*large_pool_base;
46 	struct spdk_iobuf_opts		opts;
47 	TAILQ_HEAD(, iobuf_module)	modules;
48 	spdk_iobuf_finish_cb		finish_cb;
49 	void				*finish_arg;
50 };
51 
52 static struct iobuf g_iobuf = {
53 	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
54 	.small_pool = NULL,
55 	.large_pool = NULL,
56 	.small_pool_base = NULL,
57 	.large_pool_base = NULL,
58 	.opts = {
59 		.small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE,
60 		.large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE,
61 		.small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE,
62 		.large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE,
63 	},
64 };
65 
66 struct iobuf_get_stats_ctx {
67 	struct spdk_iobuf_module_stats	*modules;
68 	uint32_t			num_modules;
69 	spdk_iobuf_get_stats_cb		cb_fn;
70 	void				*cb_arg;
71 };
72 
73 static int
74 iobuf_channel_create_cb(void *io_device, void *ctx)
75 {
76 	struct iobuf_channel *ch = ctx;
77 
78 	STAILQ_INIT(&ch->small_queue);
79 	STAILQ_INIT(&ch->large_queue);
80 
81 	return 0;
82 }
83 
84 static void
85 iobuf_channel_destroy_cb(void *io_device, void *ctx)
86 {
87 	struct iobuf_channel *ch __attribute__((unused)) = ctx;
88 
89 	assert(STAILQ_EMPTY(&ch->small_queue));
90 	assert(STAILQ_EMPTY(&ch->large_queue));
91 }
92 
93 int
94 spdk_iobuf_initialize(void)
95 {
96 	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
97 	int rc = 0;
98 	uint64_t i;
99 	struct spdk_iobuf_buffer *buf;
100 
101 	g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
102 					      SPDK_ENV_SOCKET_ID_ANY);
103 	if (!g_iobuf.small_pool) {
104 		SPDK_ERRLOG("Failed to create small iobuf pool\n");
105 		rc = -ENOMEM;
106 		goto error;
107 	}
108 
109 	/* Round up to the nearest alignment so that each element remains aligned */
110 	opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT);
111 	g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT,
112 					      NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
113 	if (g_iobuf.small_pool_base == NULL) {
114 		SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n");
115 		rc = -ENOMEM;
116 		goto error;
117 	}
118 
119 	g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
120 					      SPDK_ENV_SOCKET_ID_ANY);
121 	if (!g_iobuf.large_pool) {
122 		SPDK_ERRLOG("Failed to create large iobuf pool\n");
123 		rc = -ENOMEM;
124 		goto error;
125 	}
126 
127 	/* Round up to the nearest alignment so that each element remains aligned */
128 	opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT);
129 	g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT,
130 					      NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
131 	if (g_iobuf.large_pool_base == NULL) {
132 		SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n");
133 		rc = -ENOMEM;
134 		goto error;
135 	}
136 
137 	for (i = 0; i < opts->small_pool_count; i++) {
138 		buf = g_iobuf.small_pool_base + i * opts->small_bufsize;
139 		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
140 	}
141 
142 	for (i = 0; i < opts->large_pool_count; i++) {
143 		buf = g_iobuf.large_pool_base + i * opts->large_bufsize;
144 		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
145 	}
146 
147 	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
148 				sizeof(struct iobuf_channel), "iobuf");
149 
150 	return 0;
151 error:
152 	spdk_free(g_iobuf.small_pool_base);
153 	spdk_ring_free(g_iobuf.small_pool);
154 	spdk_free(g_iobuf.large_pool_base);
155 	spdk_ring_free(g_iobuf.large_pool);
156 
157 	return rc;
158 }
159 
160 static void
161 iobuf_unregister_cb(void *io_device)
162 {
163 	struct iobuf_module *module;
164 
165 	while (!TAILQ_EMPTY(&g_iobuf.modules)) {
166 		module = TAILQ_FIRST(&g_iobuf.modules);
167 		TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
168 		free(module->name);
169 		free(module);
170 	}
171 
172 	if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
173 		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
174 			    spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
175 	}
176 
177 	if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
178 		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
179 			    spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
180 	}
181 
182 	spdk_free(g_iobuf.small_pool_base);
183 	g_iobuf.small_pool_base = NULL;
184 	spdk_ring_free(g_iobuf.small_pool);
185 	g_iobuf.small_pool = NULL;
186 
187 	spdk_free(g_iobuf.large_pool_base);
188 	g_iobuf.large_pool_base = NULL;
189 	spdk_ring_free(g_iobuf.large_pool);
190 	g_iobuf.large_pool = NULL;
191 
192 	if (g_iobuf.finish_cb != NULL) {
193 		g_iobuf.finish_cb(g_iobuf.finish_arg);
194 	}
195 }
196 
197 void
198 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
199 {
200 	g_iobuf.finish_cb = cb_fn;
201 	g_iobuf.finish_arg = cb_arg;
202 
203 	spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
204 }
205 
206 int
207 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
208 {
209 	if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
210 		SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
211 			    IOBUF_MIN_SMALL_POOL_SIZE);
212 		return -EINVAL;
213 	}
214 	if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
215 		SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
216 			    IOBUF_MIN_LARGE_POOL_SIZE);
217 		return -EINVAL;
218 	}
219 
220 	g_iobuf.opts = *opts;
221 
222 	if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
223 		SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 ". Automatically increasing.\n",
224 			    IOBUF_MIN_SMALL_BUFSIZE);
225 		g_iobuf.opts.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE;
226 	}
227 
228 	if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
229 		SPDK_WARNLOG("large_bufsize must be at least %" PRIu32 ". Automatically increasing.\n",
230 			     IOBUF_MIN_LARGE_BUFSIZE);
231 		g_iobuf.opts.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE;
232 	}
233 
234 	return 0;
235 }
236 
237 void
238 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts)
239 {
240 	*opts = g_iobuf.opts;
241 }
242 
243 int
244 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
245 			uint32_t small_cache_size, uint32_t large_cache_size)
246 {
247 	struct spdk_io_channel *ioch;
248 	struct iobuf_channel *iobuf_ch;
249 	struct iobuf_module *module;
250 	struct spdk_iobuf_buffer *buf;
251 	uint32_t i;
252 
253 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
254 		if (strcmp(name, module->name) == 0) {
255 			break;
256 		}
257 	}
258 
259 	if (module == NULL) {
260 		SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
261 		return -ENODEV;
262 	}
263 
264 	ioch = spdk_get_io_channel(&g_iobuf);
265 	if (ioch == NULL) {
266 		SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
267 		return -ENOMEM;
268 	}
269 
270 	iobuf_ch = spdk_io_channel_get_ctx(ioch);
271 
272 	for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
273 		if (iobuf_ch->channels[i] == NULL) {
274 			iobuf_ch->channels[i] = ch;
275 			break;
276 		}
277 	}
278 
279 	if (i == IOBUF_MAX_CHANNELS) {
280 		SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i);
281 		goto error;
282 	}
283 
284 	ch->small.queue = &iobuf_ch->small_queue;
285 	ch->large.queue = &iobuf_ch->large_queue;
286 	ch->small.pool = g_iobuf.small_pool;
287 	ch->large.pool = g_iobuf.large_pool;
288 	ch->small.bufsize = g_iobuf.opts.small_bufsize;
289 	ch->large.bufsize = g_iobuf.opts.large_bufsize;
290 	ch->parent = ioch;
291 	ch->module = module;
292 	ch->small.cache_size = small_cache_size;
293 	ch->large.cache_size = large_cache_size;
294 	ch->small.cache_count = 0;
295 	ch->large.cache_count = 0;
296 
297 	STAILQ_INIT(&ch->small.cache);
298 	STAILQ_INIT(&ch->large.cache);
299 
300 	for (i = 0; i < small_cache_size; ++i) {
301 		if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) {
302 			SPDK_ERRLOG("Failed to populate iobuf small buffer cache. "
303 				    "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n",
304 				    g_iobuf.opts.small_pool_count);
305 			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
306 				    "this value.\n");
307 			goto error;
308 		}
309 		STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
310 		ch->small.cache_count++;
311 	}
312 	for (i = 0; i < large_cache_size; ++i) {
313 		if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) {
314 			SPDK_ERRLOG("Failed to populate iobuf large buffer cache. "
315 				    "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n",
316 				    g_iobuf.opts.large_pool_count);
317 			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
318 				    "this value.\n");
319 			goto error;
320 		}
321 		STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
322 		ch->large.cache_count++;
323 	}
324 
325 	return 0;
326 error:
327 	spdk_iobuf_channel_fini(ch);
328 
329 	return -ENOMEM;
330 }
331 
332 void
333 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
334 {
335 	struct spdk_iobuf_entry *entry __attribute__((unused));
336 	struct spdk_iobuf_buffer *buf;
337 	struct iobuf_channel *iobuf_ch;
338 	uint32_t i;
339 
340 	/* Make sure none of the wait queue entries are coming from this module */
341 	STAILQ_FOREACH(entry, ch->small.queue, stailq) {
342 		assert(entry->module != ch->module);
343 	}
344 	STAILQ_FOREACH(entry, ch->large.queue, stailq) {
345 		assert(entry->module != ch->module);
346 	}
347 
348 	/* Release cached buffers back to the pool */
349 	while (!STAILQ_EMPTY(&ch->small.cache)) {
350 		buf = STAILQ_FIRST(&ch->small.cache);
351 		STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
352 		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
353 		ch->small.cache_count--;
354 	}
355 	while (!STAILQ_EMPTY(&ch->large.cache)) {
356 		buf = STAILQ_FIRST(&ch->large.cache);
357 		STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
358 		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
359 		ch->large.cache_count--;
360 	}
361 
362 	assert(ch->small.cache_count == 0);
363 	assert(ch->large.cache_count == 0);
364 
365 	iobuf_ch = spdk_io_channel_get_ctx(ch->parent);
366 	for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
367 		if (iobuf_ch->channels[i] == ch) {
368 			iobuf_ch->channels[i] = NULL;
369 			break;
370 		}
371 	}
372 
373 	spdk_put_io_channel(ch->parent);
374 	ch->parent = NULL;
375 }
376 
377 int
378 spdk_iobuf_register_module(const char *name)
379 {
380 	struct iobuf_module *module;
381 
382 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
383 		if (strcmp(name, module->name) == 0) {
384 			return -EEXIST;
385 		}
386 	}
387 
388 	module = calloc(1, sizeof(*module));
389 	if (module == NULL) {
390 		return -ENOMEM;
391 	}
392 
393 	module->name = strdup(name);
394 	if (module->name == NULL) {
395 		free(module);
396 		return -ENOMEM;
397 	}
398 
399 	TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
400 
401 	return 0;
402 }
403 
404 int
405 spdk_iobuf_unregister_module(const char *name)
406 {
407 	struct iobuf_module *module;
408 
409 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
410 		if (strcmp(name, module->name) == 0) {
411 			TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
412 			free(module->name);
413 			free(module);
414 			return 0;
415 		}
416 	}
417 
418 	return -ENOENT;
419 }
420 
421 int
422 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
423 			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
424 {
425 	struct spdk_iobuf_entry *entry, *tmp;
426 	int rc;
427 
428 	STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
429 		/* We only want to iterate over the entries requested by the module which owns ch */
430 		if (entry->module != ch->module) {
431 			continue;
432 		}
433 
434 		rc = cb_fn(ch, entry, cb_ctx);
435 		if (rc != 0) {
436 			return rc;
437 		}
438 	}
439 
440 	return 0;
441 }
442 
443 void
444 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
445 		       uint64_t len)
446 {
447 	struct spdk_iobuf_pool *pool;
448 
449 	if (len <= ch->small.bufsize) {
450 		pool = &ch->small;
451 	} else {
452 		assert(len <= ch->large.bufsize);
453 		pool = &ch->large;
454 	}
455 
456 	STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
457 }
458 
459 #define IOBUF_BATCH_SIZE 32
460 
461 void *
462 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
463 	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
464 {
465 	struct spdk_iobuf_pool *pool;
466 	void *buf;
467 
468 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
469 	if (len <= ch->small.bufsize) {
470 		pool = &ch->small;
471 	} else {
472 		assert(len <= ch->large.bufsize);
473 		pool = &ch->large;
474 	}
475 
476 	buf = (void *)STAILQ_FIRST(&pool->cache);
477 	if (buf) {
478 		STAILQ_REMOVE_HEAD(&pool->cache, stailq);
479 		assert(pool->cache_count > 0);
480 		pool->cache_count--;
481 		pool->stats.cache++;
482 	} else {
483 		struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
484 		size_t sz, i;
485 
486 		/* If we're going to dequeue, we may as well dequeue a batch. */
487 		sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE,
488 				       spdk_max(pool->cache_size, 1)));
489 		if (sz == 0) {
490 			if (entry) {
491 				STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
492 				entry->module = ch->module;
493 				entry->cb_fn = cb_fn;
494 				pool->stats.retry++;
495 			}
496 
497 			return NULL;
498 		}
499 
500 		pool->stats.main++;
501 		for (i = 0; i < (sz - 1); i++) {
502 			STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq);
503 			pool->cache_count++;
504 		}
505 
506 		/* The last one is the one we'll return */
507 		buf = bufs[i];
508 	}
509 
510 	return (char *)buf;
511 }
512 
513 void
514 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
515 {
516 	struct spdk_iobuf_entry *entry;
517 	struct spdk_iobuf_buffer *iobuf_buf;
518 	struct spdk_iobuf_pool *pool;
519 	size_t sz;
520 
521 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
522 	if (len <= ch->small.bufsize) {
523 		pool = &ch->small;
524 	} else {
525 		pool = &ch->large;
526 	}
527 
528 	if (STAILQ_EMPTY(pool->queue)) {
529 		if (pool->cache_size == 0) {
530 			spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL);
531 			return;
532 		}
533 
534 		iobuf_buf = (struct spdk_iobuf_buffer *)buf;
535 
536 		STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq);
537 		pool->cache_count++;
538 
539 		/* The cache size may exceed the configured amount. We always dequeue from the
540 		 * central pool in batches of known size, so wait until at least a batch
541 		 * has been returned to actually return the buffers to the central pool. */
542 		sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size);
543 		if (pool->cache_count >= pool->cache_size + sz) {
544 			struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
545 			size_t i;
546 
547 			for (i = 0; i < sz; i++) {
548 				bufs[i] = STAILQ_FIRST(&pool->cache);
549 				STAILQ_REMOVE_HEAD(&pool->cache, stailq);
550 				assert(pool->cache_count > 0);
551 				pool->cache_count--;
552 			}
553 
554 			spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL);
555 		}
556 	} else {
557 		entry = STAILQ_FIRST(pool->queue);
558 		STAILQ_REMOVE_HEAD(pool->queue, stailq);
559 		entry->cb_fn(entry, buf);
560 	}
561 }
562 
563 static void
564 iobuf_get_channel_stats_done(struct spdk_io_channel_iter *iter, int status)
565 {
566 	struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
567 
568 	ctx->cb_fn(ctx->modules, ctx->num_modules, ctx->cb_arg);
569 	free(ctx->modules);
570 	free(ctx);
571 }
572 
573 static void
574 iobuf_get_channel_stats(struct spdk_io_channel_iter *iter)
575 {
576 	struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
577 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(iter);
578 	struct iobuf_channel *iobuf_ch = spdk_io_channel_get_ctx(ch);
579 	struct spdk_iobuf_channel *channel;
580 	struct iobuf_module *module;
581 	struct spdk_iobuf_module_stats *it;
582 	uint32_t i, j;
583 
584 	for (i = 0; i < ctx->num_modules; ++i) {
585 		for (j = 0; j < IOBUF_MAX_CHANNELS; ++j) {
586 			channel = iobuf_ch->channels[j];
587 			if (channel == NULL) {
588 				continue;
589 			}
590 
591 			it = &ctx->modules[i];
592 			module = (struct iobuf_module *)channel->module;
593 			if (strcmp(it->module, module->name) == 0) {
594 				it->small_pool.cache += channel->small.stats.cache;
595 				it->small_pool.main += channel->small.stats.main;
596 				it->small_pool.retry += channel->small.stats.retry;
597 				it->large_pool.cache += channel->large.stats.cache;
598 				it->large_pool.main += channel->large.stats.main;
599 				it->large_pool.retry += channel->large.stats.retry;
600 				break;
601 			}
602 		}
603 	}
604 
605 	spdk_for_each_channel_continue(iter, 0);
606 }
607 
608 int
609 spdk_iobuf_get_stats(spdk_iobuf_get_stats_cb cb_fn, void *cb_arg)
610 {
611 	struct iobuf_module *module;
612 	struct iobuf_get_stats_ctx *ctx;
613 	uint32_t i;
614 
615 	ctx = calloc(1, sizeof(*ctx));
616 	if (ctx == NULL) {
617 		return -ENOMEM;
618 	}
619 
620 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
621 		++ctx->num_modules;
622 	}
623 
624 	ctx->modules = calloc(ctx->num_modules, sizeof(struct spdk_iobuf_module_stats));
625 	if (ctx->modules == NULL) {
626 		free(ctx);
627 		return -ENOMEM;
628 	}
629 
630 	i = 0;
631 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
632 		ctx->modules[i].module = module->name;
633 		++i;
634 	}
635 
636 	ctx->cb_fn = cb_fn;
637 	ctx->cb_arg = cb_arg;
638 
639 	spdk_for_each_channel(&g_iobuf, iobuf_get_channel_stats, ctx,
640 			      iobuf_get_channel_stats_done);
641 	return 0;
642 }
643