xref: /spdk/lib/thread/iobuf.c (revision 838e61c3772fdefb17e1a0b8f9880e2bcb9c4c0d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2023 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/env.h"
7 #include "spdk/util.h"
8 #include "spdk/likely.h"
9 #include "spdk/log.h"
10 #include "spdk/thread.h"
11 #include "spdk/bdev.h"
12 
13 #define IOBUF_MIN_SMALL_POOL_SIZE	8191
14 #define IOBUF_MIN_LARGE_POOL_SIZE	1023
15 #define IOBUF_ALIGNMENT			512
16 #define IOBUF_MIN_SMALL_BUFSIZE		(SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + \
17 					 IOBUF_ALIGNMENT)
18 #define IOBUF_MIN_LARGE_BUFSIZE		(SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + \
19 					 IOBUF_ALIGNMENT)
20 
21 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE,
22 		   "Invalid data offset");
23 
24 struct iobuf_channel {
25 	spdk_iobuf_entry_stailq_t small_queue;
26 	spdk_iobuf_entry_stailq_t large_queue;
27 };
28 
29 struct iobuf_module {
30 	char				*name;
31 	TAILQ_ENTRY(iobuf_module)	tailq;
32 };
33 
34 struct iobuf {
35 	struct spdk_mempool		*small_pool;
36 	struct spdk_mempool		*large_pool;
37 	struct spdk_iobuf_opts		opts;
38 	TAILQ_HEAD(, iobuf_module)	modules;
39 	spdk_iobuf_finish_cb		finish_cb;
40 	void				*finish_arg;
41 };
42 
43 static struct iobuf g_iobuf = {
44 	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
45 	.opts = {
46 		.small_pool_count = IOBUF_MIN_SMALL_POOL_SIZE,
47 		.large_pool_count = IOBUF_MIN_LARGE_POOL_SIZE,
48 		.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE,
49 		.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE,
50 	},
51 };
52 
53 static int
54 iobuf_channel_create_cb(void *io_device, void *ctx)
55 {
56 	struct iobuf_channel *ch = ctx;
57 
58 	STAILQ_INIT(&ch->small_queue);
59 	STAILQ_INIT(&ch->large_queue);
60 
61 	return 0;
62 }
63 
64 static void
65 iobuf_channel_destroy_cb(void *io_device, void *ctx)
66 {
67 	struct iobuf_channel *ch __attribute__((unused)) = ctx;
68 
69 	assert(STAILQ_EMPTY(&ch->small_queue));
70 	assert(STAILQ_EMPTY(&ch->large_queue));
71 }
72 
73 int
74 spdk_iobuf_initialize(void)
75 {
76 	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
77 	int rc = 0;
78 
79 	g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count,
80 			     opts->small_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY);
81 	if (!g_iobuf.small_pool) {
82 		SPDK_ERRLOG("Failed to create small iobuf pool\n");
83 		rc = -ENOMEM;
84 		goto error;
85 	}
86 
87 	g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count,
88 			     opts->large_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY);
89 	if (!g_iobuf.large_pool) {
90 		SPDK_ERRLOG("Failed to create large iobuf pool\n");
91 		rc = -ENOMEM;
92 		goto error;
93 	}
94 
95 	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
96 				sizeof(struct iobuf_channel), "iobuf");
97 
98 	return 0;
99 error:
100 	spdk_mempool_free(g_iobuf.small_pool);
101 	return rc;
102 }
103 
104 static void
105 iobuf_unregister_cb(void *io_device)
106 {
107 	struct iobuf_module *module;
108 
109 	while (!TAILQ_EMPTY(&g_iobuf.modules)) {
110 		module = TAILQ_FIRST(&g_iobuf.modules);
111 		TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
112 		free(module->name);
113 		free(module);
114 	}
115 
116 	if (spdk_mempool_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
117 		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
118 			    spdk_mempool_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
119 	}
120 
121 	if (spdk_mempool_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
122 		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
123 			    spdk_mempool_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
124 	}
125 
126 	spdk_mempool_free(g_iobuf.small_pool);
127 	spdk_mempool_free(g_iobuf.large_pool);
128 
129 	if (g_iobuf.finish_cb != NULL) {
130 		g_iobuf.finish_cb(g_iobuf.finish_arg);
131 	}
132 }
133 
134 void
135 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
136 {
137 	g_iobuf.finish_cb = cb_fn;
138 	g_iobuf.finish_arg = cb_arg;
139 
140 	spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
141 }
142 
143 int
144 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
145 {
146 	if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
147 		SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
148 			    IOBUF_MIN_SMALL_POOL_SIZE);
149 		return -EINVAL;
150 	}
151 	if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
152 		SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
153 			    IOBUF_MIN_LARGE_POOL_SIZE);
154 		return -EINVAL;
155 	}
156 	if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
157 		SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n",
158 			    IOBUF_MIN_SMALL_BUFSIZE);
159 		return -EINVAL;
160 	}
161 	if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
162 		SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n",
163 			    IOBUF_MIN_LARGE_BUFSIZE);
164 		return -EINVAL;
165 	}
166 
167 	g_iobuf.opts = *opts;
168 
169 	return 0;
170 }
171 
172 void
173 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts)
174 {
175 	*opts = g_iobuf.opts;
176 }
177 
178 int
179 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
180 			uint32_t small_cache_size, uint32_t large_cache_size)
181 {
182 	struct spdk_io_channel *ioch;
183 	struct iobuf_channel *iobuf_ch;
184 	struct iobuf_module *module;
185 	struct spdk_iobuf_buffer *buf;
186 	uint32_t i;
187 
188 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
189 		if (strcmp(name, module->name) == 0) {
190 			break;
191 		}
192 	}
193 
194 	if (module == NULL) {
195 		SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
196 		return -ENODEV;
197 	}
198 
199 	ioch = spdk_get_io_channel(&g_iobuf);
200 	if (ioch == NULL) {
201 		SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
202 		return -ENOMEM;
203 	}
204 
205 	iobuf_ch = spdk_io_channel_get_ctx(ioch);
206 
207 	ch->small.queue = &iobuf_ch->small_queue;
208 	ch->large.queue = &iobuf_ch->large_queue;
209 	ch->small.pool = g_iobuf.small_pool;
210 	ch->large.pool = g_iobuf.large_pool;
211 	ch->small.bufsize = g_iobuf.opts.small_bufsize;
212 	ch->large.bufsize = g_iobuf.opts.large_bufsize;
213 	ch->parent = ioch;
214 	ch->module = module;
215 	ch->small.cache_size = small_cache_size;
216 	ch->large.cache_size = large_cache_size;
217 	ch->small.cache_count = 0;
218 	ch->large.cache_count = 0;
219 
220 	STAILQ_INIT(&ch->small.cache);
221 	STAILQ_INIT(&ch->large.cache);
222 
223 	for (i = 0; i < small_cache_size; ++i) {
224 		buf = spdk_mempool_get(g_iobuf.small_pool);
225 		if (buf == NULL) {
226 			SPDK_ERRLOG("Failed to populate iobuf small buffer cache. "
227 				    "You may need to increase spdk_iobuf_opts.small_pool_count\n");
228 			goto error;
229 		}
230 		STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
231 		ch->small.cache_count++;
232 	}
233 	for (i = 0; i < large_cache_size; ++i) {
234 		buf = spdk_mempool_get(g_iobuf.large_pool);
235 		if (buf == NULL) {
236 			SPDK_ERRLOG("Failed to populate iobuf large buffer cache. "
237 				    "You may need to increase spdk_iobuf_opts.large_pool_count\n");
238 			goto error;
239 		}
240 		STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
241 		ch->large.cache_count++;
242 	}
243 
244 	return 0;
245 error:
246 	spdk_iobuf_channel_fini(ch);
247 
248 	return -ENOMEM;
249 }
250 
251 void
252 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
253 {
254 	struct spdk_iobuf_entry *entry __attribute__((unused));
255 	struct spdk_iobuf_buffer *buf;
256 
257 	/* Make sure none of the wait queue entries are coming from this module */
258 	STAILQ_FOREACH(entry, ch->small.queue, stailq) {
259 		assert(entry->module != ch->module);
260 	}
261 	STAILQ_FOREACH(entry, ch->large.queue, stailq) {
262 		assert(entry->module != ch->module);
263 	}
264 
265 	/* Release cached buffers back to the pool */
266 	while (!STAILQ_EMPTY(&ch->small.cache)) {
267 		buf = STAILQ_FIRST(&ch->small.cache);
268 		STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
269 		spdk_mempool_put(ch->small.pool, buf);
270 		ch->small.cache_count--;
271 	}
272 	while (!STAILQ_EMPTY(&ch->large.cache)) {
273 		buf = STAILQ_FIRST(&ch->large.cache);
274 		STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
275 		spdk_mempool_put(ch->large.pool, buf);
276 		ch->large.cache_count--;
277 	}
278 
279 	assert(ch->small.cache_count == 0);
280 	assert(ch->large.cache_count == 0);
281 
282 	spdk_put_io_channel(ch->parent);
283 	ch->parent = NULL;
284 }
285 
286 int
287 spdk_iobuf_register_module(const char *name)
288 {
289 	struct iobuf_module *module;
290 
291 	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
292 		if (strcmp(name, module->name) == 0) {
293 			return -EEXIST;
294 		}
295 	}
296 
297 	module = calloc(1, sizeof(*module));
298 	if (module == NULL) {
299 		return -ENOMEM;
300 	}
301 
302 	module->name = strdup(name);
303 	if (module->name == NULL) {
304 		free(module);
305 		return -ENOMEM;
306 	}
307 
308 	TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
309 
310 	return 0;
311 }
312 
313 int
314 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
315 			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
316 {
317 	struct spdk_iobuf_entry *entry, *tmp;
318 	int rc;
319 
320 	STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
321 		/* We only want to iterate over the entries requested by the module which owns ch */
322 		if (entry->module != ch->module) {
323 			continue;
324 		}
325 
326 		rc = cb_fn(ch, entry, cb_ctx);
327 		if (rc != 0) {
328 			return rc;
329 		}
330 	}
331 
332 	return 0;
333 }
334 
335 void
336 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
337 		       uint64_t len)
338 {
339 	struct spdk_iobuf_pool *pool;
340 
341 	if (len <= ch->small.bufsize) {
342 		pool = &ch->small;
343 	} else {
344 		assert(len <= ch->large.bufsize);
345 		pool = &ch->large;
346 	}
347 
348 	STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
349 }
350 
351 void *
352 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
353 	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
354 {
355 	struct spdk_iobuf_pool *pool;
356 	void *buf;
357 
358 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
359 	if (len <= ch->small.bufsize) {
360 		pool = &ch->small;
361 	} else {
362 		assert(len <= ch->large.bufsize);
363 		pool = &ch->large;
364 	}
365 
366 	buf = (void *)STAILQ_FIRST(&pool->cache);
367 	if (buf) {
368 		STAILQ_REMOVE_HEAD(&pool->cache, stailq);
369 		assert(pool->cache_count > 0);
370 		pool->cache_count--;
371 	} else {
372 		buf = spdk_mempool_get(pool->pool);
373 		if (!buf) {
374 			STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
375 			entry->module = ch->module;
376 			entry->cb_fn = cb_fn;
377 
378 			return NULL;
379 		}
380 	}
381 
382 	return (char *)buf;
383 }
384 
385 void
386 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
387 {
388 	struct spdk_iobuf_entry *entry;
389 	struct spdk_iobuf_pool *pool;
390 
391 	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
392 	if (len <= ch->small.bufsize) {
393 		pool = &ch->small;
394 	} else {
395 		pool = &ch->large;
396 	}
397 
398 	if (STAILQ_EMPTY(pool->queue)) {
399 		if (pool->cache_count < pool->cache_size) {
400 			STAILQ_INSERT_HEAD(&pool->cache, (struct spdk_iobuf_buffer *)buf, stailq);
401 			pool->cache_count++;
402 		} else {
403 			spdk_mempool_put(pool->pool, buf);
404 		}
405 	} else {
406 		entry = STAILQ_FIRST(pool->queue);
407 		STAILQ_REMOVE_HEAD(pool->queue, stailq);
408 		entry->cb_fn(entry, buf);
409 	}
410 }
411