xref: /spdk/lib/thread/iobuf.c (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2023 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/env.h"
7 #include "spdk/util.h"
8 #include "spdk/likely.h"
9 #include "spdk/log.h"
10 #include "spdk/thread.h"
11 
12 #define IOBUF_MIN_SMALL_POOL_SIZE	64
13 #define IOBUF_MIN_LARGE_POOL_SIZE	8
14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE	8192
15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE	1024
16 #define IOBUF_ALIGNMENT			4096
17 #define IOBUF_MIN_SMALL_BUFSIZE		4096
18 #define IOBUF_MIN_LARGE_BUFSIZE		8192
19 #define IOBUF_DEFAULT_SMALL_BUFSIZE	(8 * 1024)
20 /* 132k is a weird choice at first, but this needs to be large enough to accomodate
21  * the default maximum size (128k) plus metadata everywhere. For code paths that
22  * are explicitly configured, the math is instead done properly. This is only
23  * for the default. */
24 #define IOBUF_DEFAULT_LARGE_BUFSIZE	(132 * 1024)
25 
26 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE,
27 		   "Invalid data offset");
28 
29 struct iobuf_channel {
30 	spdk_iobuf_entry_stailq_t small_queue;
31 	spdk_iobuf_entry_stailq_t large_queue;
32 };
33 
34 struct iobuf_module {
35 	char				*name;
36 	TAILQ_ENTRY(iobuf_module)	tailq;
37 };
38 
39 struct iobuf {
40 	struct spdk_ring		*small_pool;
41 	struct spdk_ring		*large_pool;
42 	void				*small_pool_base;
43 	void				*large_pool_base;
44 	struct spdk_iobuf_opts		opts;
45 	TAILQ_HEAD(, iobuf_module)	modules;
46 	spdk_iobuf_finish_cb		finish_cb;
47 	void				*finish_arg;
48 };
49 
50 static struct iobuf g_iobuf = {
51 	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
52 	.small_pool = NULL,
53 	.large_pool = NULL,
54 	.small_pool_base = NULL,
55 	.large_pool_base = NULL,
56 	.opts = {
57 		.small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE,
58 		.large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE,
59 		.small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE,
60 		.large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE,
61 	},
62 };
63 
64 static int
65 iobuf_channel_create_cb(void *io_device, void *ctx)
66 {
67 	struct iobuf_channel *ch = ctx;
68 
69 	STAILQ_INIT(&ch->small_queue);
70 	STAILQ_INIT(&ch->large_queue);
71 
72 	return 0;
73 }
74 
75 static void
76 iobuf_channel_destroy_cb(void *io_device, void *ctx)
77 {
78 	struct iobuf_channel *ch __attribute__((unused)) = ctx;
79 
80 	assert(STAILQ_EMPTY(&ch->small_queue));
81 	assert(STAILQ_EMPTY(&ch->large_queue));
82 }
83 
84 int
85 spdk_iobuf_initialize(void)
86 {
87 	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
88 	int rc = 0;
89 	uint64_t i;
90 	struct spdk_iobuf_buffer *buf;
91 
92 	g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
93 					      SPDK_ENV_SOCKET_ID_ANY);
94 	if (!g_iobuf.small_pool) {
95 		SPDK_ERRLOG("Failed to create small iobuf pool\n");
96 		rc = -ENOMEM;
97 		goto error;
98 	}
99 
100 	/* Round up to the nearest alignment so that each element remains aligned */
101 	opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT);
102 	g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT,
103 					      NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
104 	if (g_iobuf.small_pool_base == NULL) {
105 		SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n");
106 		rc = -ENOMEM;
107 		goto error;
108 	}
109 
110 	g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
111 					      SPDK_ENV_SOCKET_ID_ANY);
112 	if (!g_iobuf.large_pool) {
113 		SPDK_ERRLOG("Failed to create large iobuf pool\n");
114 		rc = -ENOMEM;
115 		goto error;
116 	}
117 
118 	/* Round up to the nearest alignment so that each element remains aligned */
119 	opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT);
120 	g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT,
121 					      NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
122 	if (g_iobuf.large_pool_base == NULL) {
123 		SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n");
124 		rc = -ENOMEM;
125 		goto error;
126 	}
127 
128 	for (i = 0; i < opts->small_pool_count; i++) {
129 		buf = g_iobuf.small_pool_base + i * opts->small_bufsize;
130 		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
131 	}
132 
133 	for (i = 0; i < opts->large_pool_count; i++) {
134 		buf = g_iobuf.large_pool_base + i * opts->large_bufsize;
135 		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
136 	}
137 
138 	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
139 				sizeof(struct iobuf_channel), "iobuf");
140 
141 	return 0;
142 error:
143 	spdk_free(g_iobuf.small_pool_base);
144 	spdk_ring_free(g_iobuf.small_pool);
145 	spdk_free(g_iobuf.large_pool_base);
146 	spdk_ring_free(g_iobuf.large_pool);
147 
148 	return rc;
149 }
150 
151 static void
152 iobuf_unregister_cb(void *io_device)
153 {
154 	struct iobuf_module *module;
155 
156 	while (!TAILQ_EMPTY(&g_iobuf.modules)) {
157 		module = TAILQ_FIRST(&g_iobuf.modules);
158 		TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
159 		free(module->name);
160 		free(module);
161 	}
162 
163 	if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
164 		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
165 			    spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
166 	}
167 
168 	if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
169 		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
170 			    spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
171 	}
172 
173 	spdk_free(g_iobuf.small_pool_base);
174 	g_iobuf.small_pool_base = NULL;
175 	spdk_ring_free(g_iobuf.small_pool);
176 	g_iobuf.small_pool = NULL;
177 
178 	spdk_free(g_iobuf.large_pool_base);
179 	g_iobuf.large_pool_base = NULL;
180 	spdk_ring_free(g_iobuf.large_pool);
181 	g_iobuf.large_pool = NULL;
182 
183 	if (g_iobuf.finish_cb != NULL) {
184 		g_iobuf.finish_cb(g_iobuf.finish_arg);
185 	}
186 }
187 
188 void
189 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
190 {
191 	g_iobuf.finish_cb = cb_fn;
192 	g_iobuf.finish_arg = cb_arg;
193 
194 	spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
195 }
196 
197 int
198 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
199 {
200 	if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
201 		SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
202 			    IOBUF_MIN_SMALL_POOL_SIZE);
203 		return -EINVAL;
204 	}
205 	if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
206 		SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
207 			    IOBUF_MIN_LARGE_POOL_SIZE);
208 		return -EINVAL;
209 	}
210 
211 	g_iobuf.opts = *opts;
212 
213 	if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
214 		SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 ". Automatically increasing.\n",
215 			    IOBUF_MIN_SMALL_BUFSIZE);
216 		g_iobuf.opts.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE;
217 	}
218 
219 	if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
220 		SPDK_WARNLOG("large_bufsize must be at least %" PRIu32 ". Automatically increasing.\n",
221 			     IOBUF_MIN_LARGE_BUFSIZE);
222 		g_iobuf.opts.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE;
223 	}
224 
225 	return 0;
226 }
227 
228 void
229 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts)
230 {
231 	*opts = g_iobuf.opts;
232 }
233 
234 int
235 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
236 			uint32_t small_cache_size, uint32_t large_cache_size)
237 {
238 	struct spdk_io_channel *ioch;
239 	struct iobuf_channel *iobuf_ch;
240 	struct iobuf_module *module;
241 	struct spdk_iobuf_buffer *buf;
242 	uint32_t i;
243 
244 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
245 		if (strcmp(name, module->name) == 0) {
246 			break;
247 		}
248 	}
249 
250 	if (module == NULL) {
251 		SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
252 		return -ENODEV;
253 	}
254 
255 	ioch = spdk_get_io_channel(&g_iobuf);
256 	if (ioch == NULL) {
257 		SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
258 		return -ENOMEM;
259 	}
260 
261 	iobuf_ch = spdk_io_channel_get_ctx(ioch);
262 
263 	ch->small.queue = &iobuf_ch->small_queue;
264 	ch->large.queue = &iobuf_ch->large_queue;
265 	ch->small.pool = g_iobuf.small_pool;
266 	ch->large.pool = g_iobuf.large_pool;
267 	ch->small.bufsize = g_iobuf.opts.small_bufsize;
268 	ch->large.bufsize = g_iobuf.opts.large_bufsize;
269 	ch->parent = ioch;
270 	ch->module = module;
271 	ch->small.cache_size = small_cache_size;
272 	ch->large.cache_size = large_cache_size;
273 	ch->small.cache_count = 0;
274 	ch->large.cache_count = 0;
275 
276 	STAILQ_INIT(&ch->small.cache);
277 	STAILQ_INIT(&ch->large.cache);
278 
279 	for (i = 0; i < small_cache_size; ++i) {
280 		if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) {
281 			SPDK_ERRLOG("Failed to populate iobuf small buffer cache. "
282 				    "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n",
283 				    g_iobuf.opts.small_pool_count);
284 			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
285 				    "this value.\n");
286 			goto error;
287 		}
288 		STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
289 		ch->small.cache_count++;
290 	}
291 	for (i = 0; i < large_cache_size; ++i) {
292 		if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) {
293 			SPDK_ERRLOG("Failed to populate iobuf large buffer cache. "
294 				    "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n",
295 				    g_iobuf.opts.large_pool_count);
296 			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
297 				    "this value.\n");
298 			goto error;
299 		}
300 		STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
301 		ch->large.cache_count++;
302 	}
303 
304 	return 0;
305 error:
306 	spdk_iobuf_channel_fini(ch);
307 
308 	return -ENOMEM;
309 }
310 
311 void
312 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
313 {
314 	struct spdk_iobuf_entry *entry __attribute__((unused));
315 	struct spdk_iobuf_buffer *buf;
316 
317 	/* Make sure none of the wait queue entries are coming from this module */
318 	STAILQ_FOREACH(entry, ch->small.queue, stailq) {
319 		assert(entry->module != ch->module);
320 	}
321 	STAILQ_FOREACH(entry, ch->large.queue, stailq) {
322 		assert(entry->module != ch->module);
323 	}
324 
325 	/* Release cached buffers back to the pool */
326 	while (!STAILQ_EMPTY(&ch->small.cache)) {
327 		buf = STAILQ_FIRST(&ch->small.cache);
328 		STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
329 		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
330 		ch->small.cache_count--;
331 	}
332 	while (!STAILQ_EMPTY(&ch->large.cache)) {
333 		buf = STAILQ_FIRST(&ch->large.cache);
334 		STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
335 		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
336 		ch->large.cache_count--;
337 	}
338 
339 	assert(ch->small.cache_count == 0);
340 	assert(ch->large.cache_count == 0);
341 
342 	spdk_put_io_channel(ch->parent);
343 	ch->parent = NULL;
344 }
345 
346 int
347 spdk_iobuf_register_module(const char *name)
348 {
349 	struct iobuf_module *module;
350 
351 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
352 		if (strcmp(name, module->name) == 0) {
353 			return -EEXIST;
354 		}
355 	}
356 
357 	module = calloc(1, sizeof(*module));
358 	if (module == NULL) {
359 		return -ENOMEM;
360 	}
361 
362 	module->name = strdup(name);
363 	if (module->name == NULL) {
364 		free(module);
365 		return -ENOMEM;
366 	}
367 
368 	TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
369 
370 	return 0;
371 }
372 
373 int
374 spdk_iobuf_unregister_module(const char *name)
375 {
376 	struct iobuf_module *module;
377 
378 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
379 		if (strcmp(name, module->name) == 0) {
380 			TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
381 			free(module->name);
382 			free(module);
383 			return 0;
384 		}
385 	}
386 
387 	return -ENOENT;
388 }
389 
390 int
391 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
392 			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
393 {
394 	struct spdk_iobuf_entry *entry, *tmp;
395 	int rc;
396 
397 	STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
398 		/* We only want to iterate over the entries requested by the module which owns ch */
399 		if (entry->module != ch->module) {
400 			continue;
401 		}
402 
403 		rc = cb_fn(ch, entry, cb_ctx);
404 		if (rc != 0) {
405 			return rc;
406 		}
407 	}
408 
409 	return 0;
410 }
411 
412 void
413 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
414 		       uint64_t len)
415 {
416 	struct spdk_iobuf_pool *pool;
417 
418 	if (len <= ch->small.bufsize) {
419 		pool = &ch->small;
420 	} else {
421 		assert(len <= ch->large.bufsize);
422 		pool = &ch->large;
423 	}
424 
425 	STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
426 }
427 
428 #define IOBUF_BATCH_SIZE 32
429 
430 void *
431 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
432 	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
433 {
434 	struct spdk_iobuf_pool *pool;
435 	void *buf;
436 
437 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
438 	if (len <= ch->small.bufsize) {
439 		pool = &ch->small;
440 	} else {
441 		assert(len <= ch->large.bufsize);
442 		pool = &ch->large;
443 	}
444 
445 	buf = (void *)STAILQ_FIRST(&pool->cache);
446 	if (buf) {
447 		STAILQ_REMOVE_HEAD(&pool->cache, stailq);
448 		assert(pool->cache_count > 0);
449 		pool->cache_count--;
450 	} else {
451 		struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
452 		size_t sz, i;
453 
454 		/* If we're going to dequeue, we may as well dequeue a batch. */
455 		sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE,
456 				       spdk_max(pool->cache_size, 1)));
457 		if (sz == 0) {
458 			if (entry) {
459 				STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
460 				entry->module = ch->module;
461 				entry->cb_fn = cb_fn;
462 			}
463 
464 			return NULL;
465 		}
466 
467 		for (i = 0; i < (sz - 1); i++) {
468 			STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq);
469 			pool->cache_count++;
470 		}
471 
472 		/* The last one is the one we'll return */
473 		buf = bufs[i];
474 	}
475 
476 	return (char *)buf;
477 }
478 
479 void
480 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
481 {
482 	struct spdk_iobuf_entry *entry;
483 	struct spdk_iobuf_buffer *iobuf_buf;
484 	struct spdk_iobuf_pool *pool;
485 	size_t sz;
486 
487 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
488 	if (len <= ch->small.bufsize) {
489 		pool = &ch->small;
490 	} else {
491 		pool = &ch->large;
492 	}
493 
494 	if (STAILQ_EMPTY(pool->queue)) {
495 		if (pool->cache_size == 0) {
496 			spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL);
497 			return;
498 		}
499 
500 		iobuf_buf = (struct spdk_iobuf_buffer *)buf;
501 
502 		STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq);
503 		pool->cache_count++;
504 
505 		/* The cache size may exceed the configured amount. We always dequeue from the
506 		 * central pool in batches of known size, so wait until at least a batch
507 		 * has been returned to actually return the buffers to the central pool. */
508 		sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size);
509 		if (pool->cache_count >= pool->cache_size + sz) {
510 			struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
511 			size_t i;
512 
513 			for (i = 0; i < sz; i++) {
514 				bufs[i] = STAILQ_FIRST(&pool->cache);
515 				STAILQ_REMOVE_HEAD(&pool->cache, stailq);
516 				assert(pool->cache_count > 0);
517 				pool->cache_count--;
518 			}
519 
520 			spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL);
521 		}
522 	} else {
523 		entry = STAILQ_FIRST(pool->queue);
524 		STAILQ_REMOVE_HEAD(pool->queue, stailq);
525 		entry->cb_fn(entry, buf);
526 	}
527 }
528