1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2023 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/env.h" 7 #include "spdk/util.h" 8 #include "spdk/likely.h" 9 #include "spdk/log.h" 10 #include "spdk/thread.h" 11 12 #define IOBUF_MIN_SMALL_POOL_SIZE 64 13 #define IOBUF_MIN_LARGE_POOL_SIZE 8 14 #define IOBUF_DEFAULT_SMALL_POOL_SIZE 8192 15 #define IOBUF_DEFAULT_LARGE_POOL_SIZE 1024 16 #define IOBUF_ALIGNMENT 4096 17 #define IOBUF_MIN_SMALL_BUFSIZE 4096 18 #define IOBUF_MIN_LARGE_BUFSIZE 8192 19 #define IOBUF_DEFAULT_SMALL_BUFSIZE (8 * 1024) 20 /* 132k is a weird choice at first, but this needs to be large enough to accomodate 21 * the default maximum size (128k) plus metadata everywhere. For code paths that 22 * are explicitly configured, the math is instead done properly. This is only 23 * for the default. */ 24 #define IOBUF_DEFAULT_LARGE_BUFSIZE (132 * 1024) 25 26 SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE, 27 "Invalid data offset"); 28 29 struct iobuf_channel { 30 spdk_iobuf_entry_stailq_t small_queue; 31 spdk_iobuf_entry_stailq_t large_queue; 32 }; 33 34 struct iobuf_module { 35 char *name; 36 TAILQ_ENTRY(iobuf_module) tailq; 37 }; 38 39 struct iobuf { 40 struct spdk_ring *small_pool; 41 struct spdk_ring *large_pool; 42 void *small_pool_base; 43 void *large_pool_base; 44 struct spdk_iobuf_opts opts; 45 TAILQ_HEAD(, iobuf_module) modules; 46 spdk_iobuf_finish_cb finish_cb; 47 void *finish_arg; 48 }; 49 50 static struct iobuf g_iobuf = { 51 .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules), 52 .small_pool = NULL, 53 .large_pool = NULL, 54 .small_pool_base = NULL, 55 .large_pool_base = NULL, 56 .opts = { 57 .small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE, 58 .large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE, 59 .small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE, 60 .large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE, 61 }, 62 }; 63 64 static int 65 iobuf_channel_create_cb(void *io_device, void *ctx) 66 { 67 struct iobuf_channel *ch = ctx; 68 69 STAILQ_INIT(&ch->small_queue); 70 STAILQ_INIT(&ch->large_queue); 71 72 return 0; 73 } 74 75 static void 76 iobuf_channel_destroy_cb(void *io_device, void *ctx) 77 { 78 struct iobuf_channel *ch __attribute__((unused)) = ctx; 79 80 assert(STAILQ_EMPTY(&ch->small_queue)); 81 assert(STAILQ_EMPTY(&ch->large_queue)); 82 } 83 84 int 85 spdk_iobuf_initialize(void) 86 { 87 struct spdk_iobuf_opts *opts = &g_iobuf.opts; 88 int rc = 0; 89 uint64_t i; 90 struct spdk_iobuf_buffer *buf; 91 92 g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count, 93 SPDK_ENV_SOCKET_ID_ANY); 94 if (!g_iobuf.small_pool) { 95 SPDK_ERRLOG("Failed to create small iobuf pool\n"); 96 rc = -ENOMEM; 97 goto error; 98 } 99 100 /* Round up to the nearest alignment so that each element remains aligned */ 101 opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT); 102 g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT, 103 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 104 if (g_iobuf.small_pool_base == NULL) { 105 SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n"); 106 rc = -ENOMEM; 107 goto error; 108 } 109 110 g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count, 111 SPDK_ENV_SOCKET_ID_ANY); 112 if (!g_iobuf.large_pool) { 113 SPDK_ERRLOG("Failed to create large iobuf pool\n"); 114 rc = -ENOMEM; 115 goto error; 116 } 117 118 /* Round up to the nearest alignment so that each element remains aligned */ 119 opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT); 120 g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT, 121 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 122 if (g_iobuf.large_pool_base == NULL) { 123 SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n"); 124 rc = -ENOMEM; 125 goto error; 126 } 127 128 for (i = 0; i < opts->small_pool_count; i++) { 129 buf = g_iobuf.small_pool_base + i * opts->small_bufsize; 130 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 131 } 132 133 for (i = 0; i < opts->large_pool_count; i++) { 134 buf = g_iobuf.large_pool_base + i * opts->large_bufsize; 135 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 136 } 137 138 spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb, 139 sizeof(struct iobuf_channel), "iobuf"); 140 141 return 0; 142 error: 143 spdk_free(g_iobuf.small_pool_base); 144 spdk_ring_free(g_iobuf.small_pool); 145 spdk_free(g_iobuf.large_pool_base); 146 spdk_ring_free(g_iobuf.large_pool); 147 148 return rc; 149 } 150 151 static void 152 iobuf_unregister_cb(void *io_device) 153 { 154 struct iobuf_module *module; 155 156 while (!TAILQ_EMPTY(&g_iobuf.modules)) { 157 module = TAILQ_FIRST(&g_iobuf.modules); 158 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 159 free(module->name); 160 free(module); 161 } 162 163 if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) { 164 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n", 165 spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count); 166 } 167 168 if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) { 169 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n", 170 spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count); 171 } 172 173 spdk_free(g_iobuf.small_pool_base); 174 g_iobuf.small_pool_base = NULL; 175 spdk_ring_free(g_iobuf.small_pool); 176 g_iobuf.small_pool = NULL; 177 178 spdk_free(g_iobuf.large_pool_base); 179 g_iobuf.large_pool_base = NULL; 180 spdk_ring_free(g_iobuf.large_pool); 181 g_iobuf.large_pool = NULL; 182 183 if (g_iobuf.finish_cb != NULL) { 184 g_iobuf.finish_cb(g_iobuf.finish_arg); 185 } 186 } 187 188 void 189 spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg) 190 { 191 g_iobuf.finish_cb = cb_fn; 192 g_iobuf.finish_arg = cb_arg; 193 194 spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb); 195 } 196 197 int 198 spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts) 199 { 200 if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) { 201 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n", 202 IOBUF_MIN_SMALL_POOL_SIZE); 203 return -EINVAL; 204 } 205 if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) { 206 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n", 207 IOBUF_MIN_LARGE_POOL_SIZE); 208 return -EINVAL; 209 } 210 211 g_iobuf.opts = *opts; 212 213 if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) { 214 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 ". Automatically increasing.\n", 215 IOBUF_MIN_SMALL_BUFSIZE); 216 g_iobuf.opts.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE; 217 } 218 219 if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) { 220 SPDK_WARNLOG("large_bufsize must be at least %" PRIu32 ". Automatically increasing.\n", 221 IOBUF_MIN_LARGE_BUFSIZE); 222 g_iobuf.opts.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE; 223 } 224 225 return 0; 226 } 227 228 void 229 spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts) 230 { 231 *opts = g_iobuf.opts; 232 } 233 234 int 235 spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name, 236 uint32_t small_cache_size, uint32_t large_cache_size) 237 { 238 struct spdk_io_channel *ioch; 239 struct iobuf_channel *iobuf_ch; 240 struct iobuf_module *module; 241 struct spdk_iobuf_buffer *buf; 242 uint32_t i; 243 244 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 245 if (strcmp(name, module->name) == 0) { 246 break; 247 } 248 } 249 250 if (module == NULL) { 251 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name); 252 return -ENODEV; 253 } 254 255 ioch = spdk_get_io_channel(&g_iobuf); 256 if (ioch == NULL) { 257 SPDK_ERRLOG("Couldn't get iobuf IO channel\n"); 258 return -ENOMEM; 259 } 260 261 iobuf_ch = spdk_io_channel_get_ctx(ioch); 262 263 ch->small.queue = &iobuf_ch->small_queue; 264 ch->large.queue = &iobuf_ch->large_queue; 265 ch->small.pool = g_iobuf.small_pool; 266 ch->large.pool = g_iobuf.large_pool; 267 ch->small.bufsize = g_iobuf.opts.small_bufsize; 268 ch->large.bufsize = g_iobuf.opts.large_bufsize; 269 ch->parent = ioch; 270 ch->module = module; 271 ch->small.cache_size = small_cache_size; 272 ch->large.cache_size = large_cache_size; 273 ch->small.cache_count = 0; 274 ch->large.cache_count = 0; 275 276 STAILQ_INIT(&ch->small.cache); 277 STAILQ_INIT(&ch->large.cache); 278 279 for (i = 0; i < small_cache_size; ++i) { 280 if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) { 281 SPDK_ERRLOG("Failed to populate iobuf small buffer cache. " 282 "You may need to increase spdk_iobuf_opts.small_pool_count\n"); 283 goto error; 284 } 285 STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq); 286 ch->small.cache_count++; 287 } 288 for (i = 0; i < large_cache_size; ++i) { 289 if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) { 290 SPDK_ERRLOG("Failed to populate iobuf large buffer cache. " 291 "You may need to increase spdk_iobuf_opts.large_pool_count\n"); 292 goto error; 293 } 294 STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq); 295 ch->large.cache_count++; 296 } 297 298 return 0; 299 error: 300 spdk_iobuf_channel_fini(ch); 301 302 return -ENOMEM; 303 } 304 305 void 306 spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch) 307 { 308 struct spdk_iobuf_entry *entry __attribute__((unused)); 309 struct spdk_iobuf_buffer *buf; 310 311 /* Make sure none of the wait queue entries are coming from this module */ 312 STAILQ_FOREACH(entry, ch->small.queue, stailq) { 313 assert(entry->module != ch->module); 314 } 315 STAILQ_FOREACH(entry, ch->large.queue, stailq) { 316 assert(entry->module != ch->module); 317 } 318 319 /* Release cached buffers back to the pool */ 320 while (!STAILQ_EMPTY(&ch->small.cache)) { 321 buf = STAILQ_FIRST(&ch->small.cache); 322 STAILQ_REMOVE_HEAD(&ch->small.cache, stailq); 323 spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL); 324 ch->small.cache_count--; 325 } 326 while (!STAILQ_EMPTY(&ch->large.cache)) { 327 buf = STAILQ_FIRST(&ch->large.cache); 328 STAILQ_REMOVE_HEAD(&ch->large.cache, stailq); 329 spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL); 330 ch->large.cache_count--; 331 } 332 333 assert(ch->small.cache_count == 0); 334 assert(ch->large.cache_count == 0); 335 336 spdk_put_io_channel(ch->parent); 337 ch->parent = NULL; 338 } 339 340 int 341 spdk_iobuf_register_module(const char *name) 342 { 343 struct iobuf_module *module; 344 345 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 346 if (strcmp(name, module->name) == 0) { 347 return -EEXIST; 348 } 349 } 350 351 module = calloc(1, sizeof(*module)); 352 if (module == NULL) { 353 return -ENOMEM; 354 } 355 356 module->name = strdup(name); 357 if (module->name == NULL) { 358 free(module); 359 return -ENOMEM; 360 } 361 362 TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq); 363 364 return 0; 365 } 366 367 int 368 spdk_iobuf_unregister_module(const char *name) 369 { 370 struct iobuf_module *module; 371 372 TAILQ_FOREACH(module, &g_iobuf.modules, tailq) { 373 if (strcmp(name, module->name) == 0) { 374 TAILQ_REMOVE(&g_iobuf.modules, module, tailq); 375 free(module->name); 376 free(module); 377 return 0; 378 } 379 } 380 381 return -ENOENT; 382 } 383 384 int 385 spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool, 386 spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx) 387 { 388 struct spdk_iobuf_entry *entry, *tmp; 389 int rc; 390 391 STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) { 392 /* We only want to iterate over the entries requested by the module which owns ch */ 393 if (entry->module != ch->module) { 394 continue; 395 } 396 397 rc = cb_fn(ch, entry, cb_ctx); 398 if (rc != 0) { 399 return rc; 400 } 401 } 402 403 return 0; 404 } 405 406 void 407 spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry, 408 uint64_t len) 409 { 410 struct spdk_iobuf_pool *pool; 411 412 if (len <= ch->small.bufsize) { 413 pool = &ch->small; 414 } else { 415 assert(len <= ch->large.bufsize); 416 pool = &ch->large; 417 } 418 419 STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq); 420 } 421 422 #define IOBUF_BATCH_SIZE 32 423 424 void * 425 spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len, 426 struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn) 427 { 428 struct spdk_iobuf_pool *pool; 429 void *buf; 430 431 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 432 if (len <= ch->small.bufsize) { 433 pool = &ch->small; 434 } else { 435 assert(len <= ch->large.bufsize); 436 pool = &ch->large; 437 } 438 439 buf = (void *)STAILQ_FIRST(&pool->cache); 440 if (buf) { 441 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 442 assert(pool->cache_count > 0); 443 pool->cache_count--; 444 } else { 445 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 446 size_t sz, i; 447 448 /* If we're going to dequeue, we may as well dequeue a batch. */ 449 sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE, 450 spdk_max(pool->cache_size, 1))); 451 if (sz == 0) { 452 if (entry) { 453 STAILQ_INSERT_TAIL(pool->queue, entry, stailq); 454 entry->module = ch->module; 455 entry->cb_fn = cb_fn; 456 } 457 458 return NULL; 459 } 460 461 for (i = 0; i < (sz - 1); i++) { 462 STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq); 463 pool->cache_count++; 464 } 465 466 /* The last one is the one we'll return */ 467 buf = bufs[i]; 468 } 469 470 return (char *)buf; 471 } 472 473 void 474 spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len) 475 { 476 struct spdk_iobuf_entry *entry; 477 struct spdk_iobuf_buffer *iobuf_buf; 478 struct spdk_iobuf_pool *pool; 479 size_t sz; 480 481 assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread()); 482 if (len <= ch->small.bufsize) { 483 pool = &ch->small; 484 } else { 485 pool = &ch->large; 486 } 487 488 if (STAILQ_EMPTY(pool->queue)) { 489 if (pool->cache_size == 0) { 490 spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL); 491 return; 492 } 493 494 iobuf_buf = (struct spdk_iobuf_buffer *)buf; 495 496 STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq); 497 pool->cache_count++; 498 499 /* The cache size may exceed the configured amount. We always dequeue from the 500 * central pool in batches of known size, so wait until at least a batch 501 * has been returned to actually return the buffers to the central pool. */ 502 sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size); 503 if (pool->cache_count >= pool->cache_size + sz) { 504 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE]; 505 size_t i; 506 507 for (i = 0; i < sz; i++) { 508 bufs[i] = STAILQ_FIRST(&pool->cache); 509 STAILQ_REMOVE_HEAD(&pool->cache, stailq); 510 assert(pool->cache_count > 0); 511 pool->cache_count--; 512 } 513 514 spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL); 515 } 516 } else { 517 entry = STAILQ_FIRST(pool->queue); 518 STAILQ_REMOVE_HEAD(pool->queue, stailq); 519 entry->cb_fn(entry, buf); 520 } 521 } 522