xref: /spdk/lib/thread/iobuf.c (revision 5434834a118710a6f1eb079d8b3871b3e6eea009)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2023 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/env.h"
7 #include "spdk/util.h"
8 #include "spdk/likely.h"
9 #include "spdk/log.h"
10 #include "spdk/thread.h"
11 
12 #define IOBUF_MIN_SMALL_POOL_SIZE	64
13 #define IOBUF_MIN_LARGE_POOL_SIZE	8
14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE	8192
15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE	1024
16 #define IOBUF_ALIGNMENT			4096
17 #define IOBUF_MIN_SMALL_BUFSIZE		4096
18 #define IOBUF_MIN_LARGE_BUFSIZE		8192
19 #define IOBUF_DEFAULT_SMALL_BUFSIZE	(8 * 1024)
20 /* 132k is a weird choice at first, but this needs to be large enough to accomodate
21  * the default maximum size (128k) plus metadata everywhere. For code paths that
22  * are explicitly configured, the math is instead done properly. This is only
23  * for the default. */
24 #define IOBUF_DEFAULT_LARGE_BUFSIZE	(132 * 1024)
25 #define IOBUF_MAX_CHANNELS		64
26 
27 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE,
28 		   "Invalid data offset");
29 
30 static bool g_iobuf_is_initialized = false;
31 
32 struct iobuf_channel {
33 	spdk_iobuf_entry_stailq_t	small_queue;
34 	spdk_iobuf_entry_stailq_t	large_queue;
35 	struct spdk_iobuf_channel	*channels[IOBUF_MAX_CHANNELS];
36 };
37 
38 struct iobuf_module {
39 	char				*name;
40 	TAILQ_ENTRY(iobuf_module)	tailq;
41 };
42 
43 struct iobuf {
44 	struct spdk_ring		*small_pool;
45 	struct spdk_ring		*large_pool;
46 	void				*small_pool_base;
47 	void				*large_pool_base;
48 	struct spdk_iobuf_opts		opts;
49 	TAILQ_HEAD(, iobuf_module)	modules;
50 	spdk_iobuf_finish_cb		finish_cb;
51 	void				*finish_arg;
52 };
53 
54 static struct iobuf g_iobuf = {
55 	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
56 	.small_pool = NULL,
57 	.large_pool = NULL,
58 	.small_pool_base = NULL,
59 	.large_pool_base = NULL,
60 	.opts = {
61 		.small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE,
62 		.large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE,
63 		.small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE,
64 		.large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE,
65 	},
66 };
67 
68 struct iobuf_get_stats_ctx {
69 	struct spdk_iobuf_module_stats	*modules;
70 	uint32_t			num_modules;
71 	spdk_iobuf_get_stats_cb		cb_fn;
72 	void				*cb_arg;
73 };
74 
75 static int
76 iobuf_channel_create_cb(void *io_device, void *ctx)
77 {
78 	struct iobuf_channel *ch = ctx;
79 
80 	STAILQ_INIT(&ch->small_queue);
81 	STAILQ_INIT(&ch->large_queue);
82 
83 	return 0;
84 }
85 
86 static void
87 iobuf_channel_destroy_cb(void *io_device, void *ctx)
88 {
89 	struct iobuf_channel *ch __attribute__((unused)) = ctx;
90 
91 	assert(STAILQ_EMPTY(&ch->small_queue));
92 	assert(STAILQ_EMPTY(&ch->large_queue));
93 }
94 
95 int
96 spdk_iobuf_initialize(void)
97 {
98 	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
99 	int rc = 0;
100 	uint64_t i;
101 	struct spdk_iobuf_buffer *buf;
102 
103 	g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
104 					      SPDK_ENV_SOCKET_ID_ANY);
105 	if (!g_iobuf.small_pool) {
106 		SPDK_ERRLOG("Failed to create small iobuf pool\n");
107 		rc = -ENOMEM;
108 		goto error;
109 	}
110 
111 	/* Round up to the nearest alignment so that each element remains aligned */
112 	opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT);
113 	g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT,
114 					      NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
115 	if (g_iobuf.small_pool_base == NULL) {
116 		SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n");
117 		rc = -ENOMEM;
118 		goto error;
119 	}
120 
121 	g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
122 					      SPDK_ENV_SOCKET_ID_ANY);
123 	if (!g_iobuf.large_pool) {
124 		SPDK_ERRLOG("Failed to create large iobuf pool\n");
125 		rc = -ENOMEM;
126 		goto error;
127 	}
128 
129 	/* Round up to the nearest alignment so that each element remains aligned */
130 	opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT);
131 	g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT,
132 					      NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
133 	if (g_iobuf.large_pool_base == NULL) {
134 		SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n");
135 		rc = -ENOMEM;
136 		goto error;
137 	}
138 
139 	for (i = 0; i < opts->small_pool_count; i++) {
140 		buf = g_iobuf.small_pool_base + i * opts->small_bufsize;
141 		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
142 	}
143 
144 	for (i = 0; i < opts->large_pool_count; i++) {
145 		buf = g_iobuf.large_pool_base + i * opts->large_bufsize;
146 		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
147 	}
148 
149 	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
150 				sizeof(struct iobuf_channel), "iobuf");
151 	g_iobuf_is_initialized = true;
152 
153 	return 0;
154 error:
155 	spdk_free(g_iobuf.small_pool_base);
156 	spdk_ring_free(g_iobuf.small_pool);
157 	spdk_free(g_iobuf.large_pool_base);
158 	spdk_ring_free(g_iobuf.large_pool);
159 
160 	return rc;
161 }
162 
163 static void
164 iobuf_unregister_cb(void *io_device)
165 {
166 	struct iobuf_module *module;
167 
168 	while (!TAILQ_EMPTY(&g_iobuf.modules)) {
169 		module = TAILQ_FIRST(&g_iobuf.modules);
170 		TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
171 		free(module->name);
172 		free(module);
173 	}
174 
175 	if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
176 		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
177 			    spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
178 	}
179 
180 	if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
181 		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
182 			    spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
183 	}
184 
185 	spdk_free(g_iobuf.small_pool_base);
186 	g_iobuf.small_pool_base = NULL;
187 	spdk_ring_free(g_iobuf.small_pool);
188 	g_iobuf.small_pool = NULL;
189 
190 	spdk_free(g_iobuf.large_pool_base);
191 	g_iobuf.large_pool_base = NULL;
192 	spdk_ring_free(g_iobuf.large_pool);
193 	g_iobuf.large_pool = NULL;
194 
195 	if (g_iobuf.finish_cb != NULL) {
196 		g_iobuf.finish_cb(g_iobuf.finish_arg);
197 	}
198 }
199 
200 void
201 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
202 {
203 	if (!g_iobuf_is_initialized) {
204 		cb_fn(cb_arg);
205 		return;
206 	}
207 
208 	g_iobuf_is_initialized = false;
209 	g_iobuf.finish_cb = cb_fn;
210 	g_iobuf.finish_arg = cb_arg;
211 
212 	spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
213 }
214 
215 int
216 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
217 {
218 	if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
219 		SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
220 			    IOBUF_MIN_SMALL_POOL_SIZE);
221 		return -EINVAL;
222 	}
223 	if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
224 		SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
225 			    IOBUF_MIN_LARGE_POOL_SIZE);
226 		return -EINVAL;
227 	}
228 
229 	g_iobuf.opts = *opts;
230 
231 	if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
232 		SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 ". Automatically increasing.\n",
233 			    IOBUF_MIN_SMALL_BUFSIZE);
234 		g_iobuf.opts.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE;
235 	}
236 
237 	if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
238 		SPDK_WARNLOG("large_bufsize must be at least %" PRIu32 ". Automatically increasing.\n",
239 			     IOBUF_MIN_LARGE_BUFSIZE);
240 		g_iobuf.opts.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE;
241 	}
242 
243 	return 0;
244 }
245 
246 void
247 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts)
248 {
249 	*opts = g_iobuf.opts;
250 }
251 
252 int
253 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
254 			uint32_t small_cache_size, uint32_t large_cache_size)
255 {
256 	struct spdk_io_channel *ioch;
257 	struct iobuf_channel *iobuf_ch;
258 	struct iobuf_module *module;
259 	struct spdk_iobuf_buffer *buf;
260 	uint32_t i;
261 
262 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
263 		if (strcmp(name, module->name) == 0) {
264 			break;
265 		}
266 	}
267 
268 	if (module == NULL) {
269 		SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
270 		return -ENODEV;
271 	}
272 
273 	ioch = spdk_get_io_channel(&g_iobuf);
274 	if (ioch == NULL) {
275 		SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
276 		return -ENOMEM;
277 	}
278 
279 	iobuf_ch = spdk_io_channel_get_ctx(ioch);
280 
281 	for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
282 		if (iobuf_ch->channels[i] == NULL) {
283 			iobuf_ch->channels[i] = ch;
284 			break;
285 		}
286 	}
287 
288 	if (i == IOBUF_MAX_CHANNELS) {
289 		SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i);
290 		goto error;
291 	}
292 
293 	ch->small.queue = &iobuf_ch->small_queue;
294 	ch->large.queue = &iobuf_ch->large_queue;
295 	ch->small.pool = g_iobuf.small_pool;
296 	ch->large.pool = g_iobuf.large_pool;
297 	ch->small.bufsize = g_iobuf.opts.small_bufsize;
298 	ch->large.bufsize = g_iobuf.opts.large_bufsize;
299 	ch->parent = ioch;
300 	ch->module = module;
301 	ch->small.cache_size = small_cache_size;
302 	ch->large.cache_size = large_cache_size;
303 	ch->small.cache_count = 0;
304 	ch->large.cache_count = 0;
305 
306 	STAILQ_INIT(&ch->small.cache);
307 	STAILQ_INIT(&ch->large.cache);
308 
309 	for (i = 0; i < small_cache_size; ++i) {
310 		if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) {
311 			SPDK_ERRLOG("Failed to populate iobuf small buffer cache. "
312 				    "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n",
313 				    g_iobuf.opts.small_pool_count);
314 			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
315 				    "this value.\n");
316 			goto error;
317 		}
318 		STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
319 		ch->small.cache_count++;
320 	}
321 	for (i = 0; i < large_cache_size; ++i) {
322 		if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) {
323 			SPDK_ERRLOG("Failed to populate iobuf large buffer cache. "
324 				    "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n",
325 				    g_iobuf.opts.large_pool_count);
326 			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
327 				    "this value.\n");
328 			goto error;
329 		}
330 		STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
331 		ch->large.cache_count++;
332 	}
333 
334 	return 0;
335 error:
336 	spdk_iobuf_channel_fini(ch);
337 
338 	return -ENOMEM;
339 }
340 
341 void
342 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
343 {
344 	struct spdk_iobuf_entry *entry __attribute__((unused));
345 	struct spdk_iobuf_buffer *buf;
346 	struct iobuf_channel *iobuf_ch;
347 	uint32_t i;
348 
349 	/* Make sure none of the wait queue entries are coming from this module */
350 	STAILQ_FOREACH(entry, ch->small.queue, stailq) {
351 		assert(entry->module != ch->module);
352 	}
353 	STAILQ_FOREACH(entry, ch->large.queue, stailq) {
354 		assert(entry->module != ch->module);
355 	}
356 
357 	/* Release cached buffers back to the pool */
358 	while (!STAILQ_EMPTY(&ch->small.cache)) {
359 		buf = STAILQ_FIRST(&ch->small.cache);
360 		STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
361 		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
362 		ch->small.cache_count--;
363 	}
364 	while (!STAILQ_EMPTY(&ch->large.cache)) {
365 		buf = STAILQ_FIRST(&ch->large.cache);
366 		STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
367 		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
368 		ch->large.cache_count--;
369 	}
370 
371 	assert(ch->small.cache_count == 0);
372 	assert(ch->large.cache_count == 0);
373 
374 	iobuf_ch = spdk_io_channel_get_ctx(ch->parent);
375 	for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
376 		if (iobuf_ch->channels[i] == ch) {
377 			iobuf_ch->channels[i] = NULL;
378 			break;
379 		}
380 	}
381 
382 	spdk_put_io_channel(ch->parent);
383 	ch->parent = NULL;
384 }
385 
386 int
387 spdk_iobuf_register_module(const char *name)
388 {
389 	struct iobuf_module *module;
390 
391 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
392 		if (strcmp(name, module->name) == 0) {
393 			return -EEXIST;
394 		}
395 	}
396 
397 	module = calloc(1, sizeof(*module));
398 	if (module == NULL) {
399 		return -ENOMEM;
400 	}
401 
402 	module->name = strdup(name);
403 	if (module->name == NULL) {
404 		free(module);
405 		return -ENOMEM;
406 	}
407 
408 	TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
409 
410 	return 0;
411 }
412 
413 int
414 spdk_iobuf_unregister_module(const char *name)
415 {
416 	struct iobuf_module *module;
417 
418 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
419 		if (strcmp(name, module->name) == 0) {
420 			TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
421 			free(module->name);
422 			free(module);
423 			return 0;
424 		}
425 	}
426 
427 	return -ENOENT;
428 }
429 
430 int
431 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
432 			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
433 {
434 	struct spdk_iobuf_entry *entry, *tmp;
435 	int rc;
436 
437 	STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
438 		/* We only want to iterate over the entries requested by the module which owns ch */
439 		if (entry->module != ch->module) {
440 			continue;
441 		}
442 
443 		rc = cb_fn(ch, entry, cb_ctx);
444 		if (rc != 0) {
445 			return rc;
446 		}
447 	}
448 
449 	return 0;
450 }
451 
452 void
453 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
454 		       uint64_t len)
455 {
456 	struct spdk_iobuf_pool *pool;
457 
458 	if (len <= ch->small.bufsize) {
459 		pool = &ch->small;
460 	} else {
461 		assert(len <= ch->large.bufsize);
462 		pool = &ch->large;
463 	}
464 
465 	STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
466 }
467 
468 #define IOBUF_BATCH_SIZE 32
469 
470 void *
471 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
472 	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
473 {
474 	struct spdk_iobuf_pool *pool;
475 	void *buf;
476 
477 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
478 	if (len <= ch->small.bufsize) {
479 		pool = &ch->small;
480 	} else {
481 		assert(len <= ch->large.bufsize);
482 		pool = &ch->large;
483 	}
484 
485 	buf = (void *)STAILQ_FIRST(&pool->cache);
486 	if (buf) {
487 		STAILQ_REMOVE_HEAD(&pool->cache, stailq);
488 		assert(pool->cache_count > 0);
489 		pool->cache_count--;
490 		pool->stats.cache++;
491 	} else {
492 		struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
493 		size_t sz, i;
494 
495 		/* If we're going to dequeue, we may as well dequeue a batch. */
496 		sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE,
497 				       spdk_max(pool->cache_size, 1)));
498 		if (sz == 0) {
499 			if (entry) {
500 				STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
501 				entry->module = ch->module;
502 				entry->cb_fn = cb_fn;
503 				pool->stats.retry++;
504 			}
505 
506 			return NULL;
507 		}
508 
509 		pool->stats.main++;
510 		for (i = 0; i < (sz - 1); i++) {
511 			STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq);
512 			pool->cache_count++;
513 		}
514 
515 		/* The last one is the one we'll return */
516 		buf = bufs[i];
517 	}
518 
519 	return (char *)buf;
520 }
521 
522 void
523 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
524 {
525 	struct spdk_iobuf_entry *entry;
526 	struct spdk_iobuf_buffer *iobuf_buf;
527 	struct spdk_iobuf_pool *pool;
528 	size_t sz;
529 
530 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
531 	if (len <= ch->small.bufsize) {
532 		pool = &ch->small;
533 	} else {
534 		pool = &ch->large;
535 	}
536 
537 	if (STAILQ_EMPTY(pool->queue)) {
538 		if (pool->cache_size == 0) {
539 			spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL);
540 			return;
541 		}
542 
543 		iobuf_buf = (struct spdk_iobuf_buffer *)buf;
544 
545 		STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq);
546 		pool->cache_count++;
547 
548 		/* The cache size may exceed the configured amount. We always dequeue from the
549 		 * central pool in batches of known size, so wait until at least a batch
550 		 * has been returned to actually return the buffers to the central pool. */
551 		sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size);
552 		if (pool->cache_count >= pool->cache_size + sz) {
553 			struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
554 			size_t i;
555 
556 			for (i = 0; i < sz; i++) {
557 				bufs[i] = STAILQ_FIRST(&pool->cache);
558 				STAILQ_REMOVE_HEAD(&pool->cache, stailq);
559 				assert(pool->cache_count > 0);
560 				pool->cache_count--;
561 			}
562 
563 			spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL);
564 		}
565 	} else {
566 		entry = STAILQ_FIRST(pool->queue);
567 		STAILQ_REMOVE_HEAD(pool->queue, stailq);
568 		entry->cb_fn(entry, buf);
569 	}
570 }
571 
572 static void
573 iobuf_get_channel_stats_done(struct spdk_io_channel_iter *iter, int status)
574 {
575 	struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
576 
577 	ctx->cb_fn(ctx->modules, ctx->num_modules, ctx->cb_arg);
578 	free(ctx->modules);
579 	free(ctx);
580 }
581 
582 static void
583 iobuf_get_channel_stats(struct spdk_io_channel_iter *iter)
584 {
585 	struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
586 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(iter);
587 	struct iobuf_channel *iobuf_ch = spdk_io_channel_get_ctx(ch);
588 	struct spdk_iobuf_channel *channel;
589 	struct iobuf_module *module;
590 	struct spdk_iobuf_module_stats *it;
591 	uint32_t i, j;
592 
593 	for (i = 0; i < ctx->num_modules; ++i) {
594 		for (j = 0; j < IOBUF_MAX_CHANNELS; ++j) {
595 			channel = iobuf_ch->channels[j];
596 			if (channel == NULL) {
597 				continue;
598 			}
599 
600 			it = &ctx->modules[i];
601 			module = (struct iobuf_module *)channel->module;
602 			if (strcmp(it->module, module->name) == 0) {
603 				it->small_pool.cache += channel->small.stats.cache;
604 				it->small_pool.main += channel->small.stats.main;
605 				it->small_pool.retry += channel->small.stats.retry;
606 				it->large_pool.cache += channel->large.stats.cache;
607 				it->large_pool.main += channel->large.stats.main;
608 				it->large_pool.retry += channel->large.stats.retry;
609 				break;
610 			}
611 		}
612 	}
613 
614 	spdk_for_each_channel_continue(iter, 0);
615 }
616 
617 int
618 spdk_iobuf_get_stats(spdk_iobuf_get_stats_cb cb_fn, void *cb_arg)
619 {
620 	struct iobuf_module *module;
621 	struct iobuf_get_stats_ctx *ctx;
622 	uint32_t i;
623 
624 	ctx = calloc(1, sizeof(*ctx));
625 	if (ctx == NULL) {
626 		return -ENOMEM;
627 	}
628 
629 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
630 		++ctx->num_modules;
631 	}
632 
633 	ctx->modules = calloc(ctx->num_modules, sizeof(struct spdk_iobuf_module_stats));
634 	if (ctx->modules == NULL) {
635 		free(ctx);
636 		return -ENOMEM;
637 	}
638 
639 	i = 0;
640 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
641 		ctx->modules[i].module = module->name;
642 		++i;
643 	}
644 
645 	ctx->cb_fn = cb_fn;
646 	ctx->cb_arg = cb_arg;
647 
648 	spdk_for_each_channel(&g_iobuf, iobuf_get_channel_stats, ctx,
649 			      iobuf_get_channel_stats_done);
650 	return 0;
651 }
652