1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2023 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/env.h" 7 #include "spdk/util.h" 8 #include "spdk/likely.h" 9 #include "spdk/log.h" 10 #include "spdk/thread.h" 11 #include "spdk/bdev.h" 12 13 #define IOBUF_MIN_SMALL_POOL_SIZE 8191 14 #define IOBUF_MIN_LARGE_POOL_SIZE 1023 15 #define IOBUF_ALIGNMENT 512 16 #define IOBUF_MIN_SMALL_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + \ 17 IOBUF_ALIGNMENT) 18 #define IOBUF_MIN_LARGE_BUFSIZE (SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + \ 19 IOBUF_ALIGNMENT) 20 21 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE, 22 "Invalid data offset"); 23 24 struct iobuf_channel { 25 spdk_iobuf_entry_stailq_t small_queue; 26 spdk_iobuf_entry_stailq_t large_queue; 27 }; 28 29 struct iobuf_module { 30 char *name; 31 TAILQ_ENTRY(iobuf_module) tailq; 32 }; 33 34 struct iobuf { 35 struct spdk_mempool *small_pool; 36 struct spdk_mempool *large_pool; 37 struct spdk_iobuf_opts opts; 38 TAILQ_HEAD(, iobuf_module) modules; 39 spdk_iobuf_finish_cb finish_cb; 40 void *finish_arg; 41 }; 42 43 static struct iobuf g_iobuf = { 44 .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), 45 .opts = { 46 .small_pool_count = IOBUF_MIN_SMALL_POOL_SIZE, 47 .large_pool_count = IOBUF_MIN_LARGE_POOL_SIZE, 48 .small_bufsize = IOBUF_MIN_SMALL_BUFSIZE, 49 .large_bufsize = IOBUF_MIN_LARGE_BUFSIZE, 50 }, 51 }; 52 53 static int 54 iobuf_channel_create_cb(void *io_device, void *ctx) 55 { 56 struct iobuf_channel *ch = ctx; 57 58 STAILQ_INIT(&ch->small_queue); 59 STAILQ_INIT(&ch->large_queue); 60 61 return 0; 62 } 63 64 static void 65 iobuf_channel_destroy_cb(void *io_device, void *ctx) 66 { 67 struct iobuf_channel *ch __attribute__((unused)) = ctx; 68 69 assert(STAILQ_EMPTY(&ch->small_queue)); 70 assert(STAILQ_EMPTY(&ch->large_queue)); 71 } 72 73 int 74 spdk_iobuf_initialize(void) 75 { 76 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 77 int rc = 0; 78 79 g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count, 80 opts->small_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY); 81 if (!g_iobuf.small_pool) { 82 SPDK_ERRLOG("Failed to create small iobuf pool\n"); 83 rc = -ENOMEM; 84 goto error; 85 } 86 87 g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count, 88 opts->large_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY); 89 if (!g_iobuf.large_pool) { 90 SPDK_ERRLOG("Failed to create large iobuf pool\n"); 91 rc = -ENOMEM; 92 goto error; 93 } 94 95 spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, 96 sizeof(struct iobuf_channel), "iobuf"); 97 98 return 0; 99 error: 100 spdk_mempool_free(g_iobuf.small_pool); 101 return rc; 102 } 103 104 static void 105 iobuf_unregister_cb(void *io_device) 106 { 107 struct iobuf_module *module; 108 109 while (!TAILQ_EMPTY(&g_iobuf.modules)) { 110 module = TAILQ_FIRST(&g_iobuf.modules); 111 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 112 free(module->name); 113 free(module); 114 } 115 116 if (spdk_mempool_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) { 117 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", 118 spdk_mempool_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count); 119 } 120 121 if (spdk_mempool_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) { 122 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", 123 spdk_mempool_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count); 124 } 125 126 spdk_mempool_free(g_iobuf.small_pool); 127 spdk_mempool_free(g_iobuf.large_pool); 128 129 if (g_iobuf.finish_cb != NULL) { 130 g_iobuf.finish_cb(g_iobuf.finish_arg); 131 } 132 } 133 134 void 135 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) 136 { 137 g_iobuf.finish_cb = cb_fn; 138 g_iobuf.finish_arg = cb_arg; 139 140 spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); 141 } 142 143 int 144 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) 145 { 146 if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { 147 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", 148 IOBUF_MIN_SMALL_POOL_SIZE); 149 return -EINVAL; 150 } 151 if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { 152 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", 153 IOBUF_MIN_LARGE_POOL_SIZE); 154 return -EINVAL; 155 } 156 if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { 157 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n", 158 IOBUF_MIN_SMALL_BUFSIZE); 159 return -EINVAL; 160 } 161 if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { 162 SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n", 163 IOBUF_MIN_LARGE_BUFSIZE); 164 return -EINVAL; 165 } 166 167 g_iobuf.opts = *opts; 168 169 return 0; 170 } 171 172 void 173 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts) 174 { 175 *opts = g_iobuf.opts; 176 } 177 178 int 179 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, 180 uint32_t small_cache_size, uint32_t large_cache_size) 181 { 182 struct spdk_io_channel *ioch; 183 struct iobuf_channel *iobuf_ch; 184 struct iobuf_module *module; 185 struct spdk_iobuf_buffer *buf; 186 uint32_t i; 187 188 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 189 if (strcmp(name, module->name) == 0) { 190 break; 191 } 192 } 193 194 if (module == NULL) { 195 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); 196 return -ENODEV; 197 } 198 199 ioch = spdk_get_io_channel(&g_iobuf); 200 if (ioch == NULL) { 201 SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); 202 return -ENOMEM; 203 } 204 205 iobuf_ch = spdk_io_channel_get_ctx(ioch); 206 207 ch->small.queue = &iobuf_ch->small_queue; 208 ch->large.queue = &iobuf_ch->large_queue; 209 ch->small.pool = g_iobuf.small_pool; 210 ch->large.pool = g_iobuf.large_pool; 211 ch->small.bufsize = g_iobuf.opts.small_bufsize; 212 ch->large.bufsize = g_iobuf.opts.large_bufsize; 213 ch->parent = ioch; 214 ch->module = module; 215 ch->small.cache_size = small_cache_size; 216 ch->large.cache_size = large_cache_size; 217 ch->small.cache_count = 0; 218 ch->large.cache_count = 0; 219 220 STAILQ_INIT(&ch->small.cache); 221 STAILQ_INIT(&ch->large.cache); 222 223 for (i = 0; i < small_cache_size; ++i) { 224 buf = spdk_mempool_get(g_iobuf.small_pool); 225 if (buf == NULL) { 226 SPDK_ERRLOG("Failed to populate iobuf small buffer cache. " 227 "You may need to increase spdk_iobuf_opts.small_pool_count\n"); 228 goto error; 229 } 230 STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq); 231 ch->small.cache_count++; 232 } 233 for (i = 0; i < large_cache_size; ++i) { 234 buf = spdk_mempool_get(g_iobuf.large_pool); 235 if (buf == NULL) { 236 SPDK_ERRLOG("Failed to populate iobuf large buffer cache. " 237 "You may need to increase spdk_iobuf_opts.large_pool_count\n"); 238 goto error; 239 } 240 STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq); 241 ch->large.cache_count++; 242 } 243 244 return 0; 245 error: 246 spdk_iobuf_channel_fini(ch); 247 248 return -ENOMEM; 249 } 250 251 void 252 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) 253 { 254 struct spdk_iobuf_entry *entry __attribute__((unused)); 255 struct spdk_iobuf_buffer *buf; 256 257 /* Make sure none of the wait queue entries are coming from this module */ 258 STAILQ_FOREACH(entry, ch->small.queue, stailq) { 259 assert(entry->module != ch->module); 260 } 261 STAILQ_FOREACH(entry, ch->large.queue, stailq) { 262 assert(entry->module != ch->module); 263 } 264 265 /* Release cached buffers back to the pool */ 266 while (!STAILQ_EMPTY(&ch->small.cache)) { 267 buf = STAILQ_FIRST(&ch->small.cache); 268 STAILQ_REMOVE_HEAD(&ch->small.cache, stailq); 269 spdk_mempool_put(ch->small.pool, buf); 270 ch->small.cache_count--; 271 } 272 while (!STAILQ_EMPTY(&ch->large.cache)) { 273 buf = STAILQ_FIRST(&ch->large.cache); 274 STAILQ_REMOVE_HEAD(&ch->large.cache, stailq); 275 spdk_mempool_put(ch->large.pool, buf); 276 ch->large.cache_count--; 277 } 278 279 assert(ch->small.cache_count == 0); 280 assert(ch->large.cache_count == 0); 281 282 spdk_put_io_channel(ch->parent); 283 ch->parent = NULL; 284 } 285 286 int 287 spdk_iobuf_register_module(const char *name) 288 { 289 struct iobuf_module *module; 290 291 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 292 if (strcmp(name, module->name) == 0) { 293 return -EEXIST; 294 } 295 } 296 297 module = calloc(1, sizeof(*module)); 298 if (module == NULL) { 299 return -ENOMEM; 300 } 301 302 module->name = strdup(name); 303 if (module->name == NULL) { 304 free(module); 305 return -ENOMEM; 306 } 307 308 TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); 309 310 return 0; 311 } 312 313 int 314 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, 315 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 316 { 317 struct spdk_iobuf_entry *entry, *tmp; 318 int rc; 319 320 STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { 321 /* We only want to iterate over the entries requested by the module which owns ch */ 322 if (entry->module != ch->module) { 323 continue; 324 } 325 326 rc = cb_fn(ch, entry, cb_ctx); 327 if (rc != 0) { 328 return rc; 329 } 330 } 331 332 return 0; 333 } 334 335 void 336 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, 337 uint64_t len) 338 { 339 struct spdk_iobuf_pool *pool; 340 341 if (len <= ch->small.bufsize) { 342 pool = &ch->small; 343 } else { 344 assert(len <= ch->large.bufsize); 345 pool = &ch->large; 346 } 347 348 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 349 } 350 351 void * 352 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, 353 struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn) 354 { 355 struct spdk_iobuf_pool *pool; 356 void *buf; 357 358 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 359 if (len <= ch->small.bufsize) { 360 pool = &ch->small; 361 } else { 362 assert(len <= ch->large.bufsize); 363 pool = &ch->large; 364 } 365 366 buf = (void *)STAILQ_FIRST(&pool->cache); 367 if (buf) { 368 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 369 assert(pool->cache_count > 0); 370 pool->cache_count--; 371 } else { 372 buf = spdk_mempool_get(pool->pool); 373 if (!buf) { 374 STAILQ_INSERT_TAIL(pool->queue, entry, stailq); 375 entry->module = ch->module; 376 entry->cb_fn = cb_fn; 377 378 return NULL; 379 } 380 } 381 382 return (char *)buf; 383 } 384 385 void 386 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) 387 { 388 struct spdk_iobuf_entry *entry; 389 struct spdk_iobuf_pool *pool; 390 391 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 392 if (len <= ch->small.bufsize) { 393 pool = &ch->small; 394 } else { 395 pool = &ch->large; 396 } 397 398 if (STAILQ_EMPTY(pool->queue)) { 399 if (pool->cache_count < pool->cache_size) { 400 STAILQ_INSERT_HEAD(&pool->cache, (struct spdk_iobuf_buffer *)buf, stailq); 401 pool->cache_count++; 402 } else { 403 spdk_mempool_put(pool->pool, buf); 404 } 405 } else { 406 entry = STAILQ_FIRST(pool->queue); 407 STAILQ_REMOVE_HEAD(pool->queue, stailq); 408 entry->cb_fn(entry, buf); 409 } 410 } 411