1 /* 2 * Copyright (c) Meta Platforms, Inc. and affiliates. 3 * All rights reserved. 4 * 5 * This source code is licensed under both the BSD-style license (found in the 6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7 * in the COPYING file in the root directory of this source tree). 8 * You may select, at your option, one of the above-listed licenses. 9 */ 10 11 #include "platform.h" 12 #include <stdio.h> /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */ 13 #include <stdlib.h> /* malloc, free */ 14 #include <assert.h> 15 #include <errno.h> /* errno */ 16 17 #if defined (_MSC_VER) 18 # include <sys/stat.h> 19 # include <io.h> 20 #endif 21 22 #include "fileio_asyncio.h" 23 #include "fileio_common.h" 24 25 /* ********************************************************************** 26 * Sparse write 27 ************************************************************************/ 28 29 /** AIO_fwriteSparse() : 30 * @return : storedSkips, 31 * argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */ 32 static unsigned 33 AIO_fwriteSparse(FILE* file, 34 const void* buffer, size_t bufferSize, 35 const FIO_prefs_t* const prefs, 36 unsigned storedSkips) 37 { 38 const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */ 39 size_t bufferSizeT = bufferSize / sizeof(size_t); 40 const size_t* const bufferTEnd = bufferT + bufferSizeT; 41 const size_t* ptrT = bufferT; 42 static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */ 43 44 if (prefs->testMode) return 0; /* do not output anything in test mode */ 45 46 if (!prefs->sparseFileSupport) { /* normal write */ 47 size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); 48 if (sizeCheck != bufferSize) 49 EXM_THROW(70, "Write error : cannot write block : %s", 50 strerror(errno)); 51 return 0; 52 } 53 54 /* avoid int overflow */ 55 if (storedSkips > 1 GB) { 56 if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0) 57 EXM_THROW(91, "1 GB skip error (sparse file support)"); 58 storedSkips -= 1 GB; 59 } 60 61 while (ptrT < bufferTEnd) { 62 size_t nb0T; 63 64 /* adjust last segment if < 32 KB */ 65 size_t seg0SizeT = segmentSizeT; 66 if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT; 67 bufferSizeT -= seg0SizeT; 68 69 /* count leading zeroes */ 70 for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ; 71 storedSkips += (unsigned)(nb0T * sizeof(size_t)); 72 73 if (nb0T != seg0SizeT) { /* not all 0s */ 74 size_t const nbNon0ST = seg0SizeT - nb0T; 75 /* skip leading zeros */ 76 if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) 77 EXM_THROW(92, "Sparse skip error ; try --no-sparse"); 78 storedSkips = 0; 79 /* write the rest */ 80 if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) 81 EXM_THROW(93, "Write error : cannot write block : %s", 82 strerror(errno)); 83 } 84 ptrT += seg0SizeT; 85 } 86 87 { static size_t const maskT = sizeof(size_t)-1; 88 if (bufferSize & maskT) { 89 /* size not multiple of sizeof(size_t) : implies end of block */ 90 const char* const restStart = (const char*)bufferTEnd; 91 const char* restPtr = restStart; 92 const char* const restEnd = (const char*)buffer + bufferSize; 93 assert(restEnd > restStart && restEnd < restStart + sizeof(size_t)); 94 for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ; 95 storedSkips += (unsigned) (restPtr - restStart); 96 if (restPtr != restEnd) { 97 /* not all remaining bytes are 0 */ 98 size_t const restSize = (size_t)(restEnd - restPtr); 99 if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) 100 EXM_THROW(92, "Sparse skip error ; try --no-sparse"); 101 if (fwrite(restPtr, 1, restSize, file) != restSize) 102 EXM_THROW(95, "Write error : cannot write end of decoded block : %s", 103 strerror(errno)); 104 storedSkips = 0; 105 } } } 106 107 return storedSkips; 108 } 109 110 static void 111 AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) 112 { 113 if (prefs->testMode) assert(storedSkips == 0); 114 if (storedSkips>0) { 115 assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */ 116 (void)prefs; /* assert can be disabled, in which case prefs becomes unused */ 117 if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0) 118 EXM_THROW(69, "Final skip error (sparse file support)"); 119 /* last zero must be explicitly written, 120 * so that skipped ones get implicitly translated as zero by FS */ 121 { const char lastZeroByte[1] = { 0 }; 122 if (fwrite(lastZeroByte, 1, 1, file) != 1) 123 EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno)); 124 } } 125 } 126 127 128 /* ********************************************************************** 129 * AsyncIO functionality 130 ************************************************************************/ 131 132 /* AIO_supported: 133 * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ 134 int AIO_supported(void) { 135 #ifdef ZSTD_MULTITHREAD 136 return 1; 137 #else 138 return 0; 139 #endif 140 } 141 142 /* *********************************** 143 * Generic IoPool implementation 144 *************************************/ 145 146 static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) { 147 IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t)); 148 void* const buffer = malloc(bufferSize); 149 if(!job || !buffer) 150 EXM_THROW(101, "Allocation error : not enough memory"); 151 job->buffer = buffer; 152 job->bufferSize = bufferSize; 153 job->usedBufferSize = 0; 154 job->file = NULL; 155 job->ctx = ctx; 156 job->offset = 0; 157 return job; 158 } 159 160 161 /* AIO_IOPool_createThreadPool: 162 * Creates a thread pool and a mutex for threaded IO pool. 163 * Displays warning if asyncio is requested but MT isn't available. */ 164 static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) { 165 ctx->threadPool = NULL; 166 ctx->threadPoolActive = 0; 167 if(prefs->asyncIO) { 168 if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL)) 169 EXM_THROW(102,"Failed creating ioJobsMutex mutex"); 170 /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to 171 * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ 172 assert(MAX_IO_JOBS >= 2); 173 ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2); 174 ctx->threadPoolActive = 1; 175 if (!ctx->threadPool) 176 EXM_THROW(104, "Failed creating I/O thread pool"); 177 } 178 } 179 180 /* AIO_IOPool_init: 181 * Allocates and sets and a new I/O thread pool including its included availableJobs. */ 182 static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) { 183 int i; 184 AIO_IOPool_createThreadPool(ctx, prefs); 185 ctx->prefs = prefs; 186 ctx->poolFunction = poolFunction; 187 ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2; 188 ctx->availableJobsCount = ctx->totalIoJobs; 189 for(i=0; i < ctx->availableJobsCount; i++) { 190 ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize); 191 } 192 ctx->jobBufferSize = bufferSize; 193 ctx->file = NULL; 194 } 195 196 197 /* AIO_IOPool_threadPoolActive: 198 * Check if current operation uses thread pool. 199 * Note that in some cases we have a thread pool initialized but choose not to use it. */ 200 static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) { 201 return ctx->threadPool && ctx->threadPoolActive; 202 } 203 204 205 /* AIO_IOPool_lockJobsMutex: 206 * Locks the IO jobs mutex if threading is active */ 207 static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) { 208 if(AIO_IOPool_threadPoolActive(ctx)) 209 ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); 210 } 211 212 /* AIO_IOPool_unlockJobsMutex: 213 * Unlocks the IO jobs mutex if threading is active */ 214 static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) { 215 if(AIO_IOPool_threadPoolActive(ctx)) 216 ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); 217 } 218 219 /* AIO_IOPool_releaseIoJob: 220 * Releases an acquired job back to the pool. Doesn't execute the job. */ 221 static void AIO_IOPool_releaseIoJob(IOJob_t* job) { 222 IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx; 223 AIO_IOPool_lockJobsMutex(ctx); 224 assert(ctx->availableJobsCount < ctx->totalIoJobs); 225 ctx->availableJobs[ctx->availableJobsCount++] = job; 226 AIO_IOPool_unlockJobsMutex(ctx); 227 } 228 229 /* AIO_IOPool_join: 230 * Waits for all tasks in the pool to finish executing. */ 231 static void AIO_IOPool_join(IOPoolCtx_t* ctx) { 232 if(AIO_IOPool_threadPoolActive(ctx)) 233 POOL_joinJobs(ctx->threadPool); 234 } 235 236 /* AIO_IOPool_setThreaded: 237 * Allows (de)activating threaded mode, to be used when the expected overhead 238 * of threading costs more than the expected gains. */ 239 static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) { 240 assert(threaded == 0 || threaded == 1); 241 assert(ctx != NULL); 242 if(ctx->threadPoolActive != threaded) { 243 AIO_IOPool_join(ctx); 244 ctx->threadPoolActive = threaded; 245 } 246 } 247 248 /* AIO_IOPool_free: 249 * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */ 250 static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) { 251 int i; 252 if(ctx->threadPool) { 253 /* Make sure we finish all tasks and then free the resources */ 254 AIO_IOPool_join(ctx); 255 /* Make sure we are not leaking availableJobs */ 256 assert(ctx->availableJobsCount == ctx->totalIoJobs); 257 POOL_free(ctx->threadPool); 258 ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex); 259 } 260 assert(ctx->file == NULL); 261 for(i=0; i<ctx->availableJobsCount; i++) { 262 IOJob_t* job = (IOJob_t*) ctx->availableJobs[i]; 263 free(job->buffer); 264 free(job); 265 } 266 } 267 268 /* AIO_IOPool_acquireJob: 269 * Returns an available io job to be used for a future io. */ 270 static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) { 271 IOJob_t *job; 272 assert(ctx->file != NULL || ctx->prefs->testMode); 273 AIO_IOPool_lockJobsMutex(ctx); 274 assert(ctx->availableJobsCount > 0); 275 job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount]; 276 AIO_IOPool_unlockJobsMutex(ctx); 277 job->usedBufferSize = 0; 278 job->file = ctx->file; 279 job->offset = 0; 280 return job; 281 } 282 283 284 /* AIO_IOPool_setFile: 285 * Sets the destination file for future files in the pool. 286 * Requires completion of all queued jobs and release of all otherwise acquired jobs. */ 287 static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) { 288 assert(ctx!=NULL); 289 AIO_IOPool_join(ctx); 290 assert(ctx->availableJobsCount == ctx->totalIoJobs); 291 ctx->file = file; 292 } 293 294 static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) { 295 return ctx->file; 296 } 297 298 /* AIO_IOPool_enqueueJob: 299 * Enqueues an io job for execution. 300 * The queued job shouldn't be used directly after queueing it. */ 301 static void AIO_IOPool_enqueueJob(IOJob_t* job) { 302 IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx; 303 if(AIO_IOPool_threadPoolActive(ctx)) 304 POOL_add(ctx->threadPool, ctx->poolFunction, job); 305 else 306 ctx->poolFunction(job); 307 } 308 309 /* *********************************** 310 * WritePool implementation 311 *************************************/ 312 313 /* AIO_WritePool_acquireJob: 314 * Returns an available write job to be used for a future write. */ 315 IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) { 316 return AIO_IOPool_acquireJob(&ctx->base); 317 } 318 319 /* AIO_WritePool_enqueueAndReacquireWriteJob: 320 * Queues a write job for execution and acquires a new one. 321 * After execution `job`'s pointed value would change to the newly acquired job. 322 * Make sure to set `usedBufferSize` to the wanted length before call. 323 * The queued job shouldn't be used directly after queueing it. */ 324 void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) { 325 AIO_IOPool_enqueueJob(*job); 326 *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx); 327 } 328 329 /* AIO_WritePool_sparseWriteEnd: 330 * Ends sparse writes to the current file. 331 * Blocks on completion of all current write jobs before executing. */ 332 void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) { 333 assert(ctx != NULL); 334 AIO_IOPool_join(&ctx->base); 335 AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips); 336 ctx->storedSkips = 0; 337 } 338 339 /* AIO_WritePool_setFile: 340 * Sets the destination file for future writes in the pool. 341 * Requires completion of all queues write jobs and release of all otherwise acquired jobs. 342 * Also requires ending of sparse write if a previous file was used in sparse mode. */ 343 void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) { 344 AIO_IOPool_setFile(&ctx->base, file); 345 assert(ctx->storedSkips == 0); 346 } 347 348 /* AIO_WritePool_getFile: 349 * Returns the file the writePool is currently set to write to. */ 350 FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) { 351 return AIO_IOPool_getFile(&ctx->base); 352 } 353 354 /* AIO_WritePool_releaseIoJob: 355 * Releases an acquired job back to the pool. Doesn't execute the job. */ 356 void AIO_WritePool_releaseIoJob(IOJob_t* job) { 357 AIO_IOPool_releaseIoJob(job); 358 } 359 360 /* AIO_WritePool_closeFile: 361 * Ends sparse write and closes the writePool's current file and sets the file to NULL. 362 * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ 363 int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) { 364 FILE* const dstFile = ctx->base.file; 365 assert(dstFile!=NULL || ctx->base.prefs->testMode!=0); 366 AIO_WritePool_sparseWriteEnd(ctx); 367 AIO_IOPool_setFile(&ctx->base, NULL); 368 return fclose(dstFile); 369 } 370 371 /* AIO_WritePool_executeWriteJob: 372 * Executes a write job synchronously. Can be used as a function for a thread pool. */ 373 static void AIO_WritePool_executeWriteJob(void* opaque){ 374 IOJob_t* const job = (IOJob_t*) opaque; 375 WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx; 376 ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips); 377 AIO_IOPool_releaseIoJob(job); 378 } 379 380 /* AIO_WritePool_create: 381 * Allocates and sets and a new write pool including its included jobs. */ 382 WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) { 383 WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t)); 384 if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); 385 AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize); 386 ctx->storedSkips = 0; 387 return ctx; 388 } 389 390 /* AIO_WritePool_free: 391 * Frees and releases a writePool and its resources. Closes destination file if needs to. */ 392 void AIO_WritePool_free(WritePoolCtx_t* ctx) { 393 /* Make sure we finish all tasks and then free the resources */ 394 if(AIO_WritePool_getFile(ctx)) 395 AIO_WritePool_closeFile(ctx); 396 AIO_IOPool_destroy(&ctx->base); 397 assert(ctx->storedSkips==0); 398 free(ctx); 399 } 400 401 /* AIO_WritePool_setAsync: 402 * Allows (de)activating async mode, to be used when the expected overhead 403 * of asyncio costs more than the expected gains. */ 404 void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) { 405 AIO_IOPool_setThreaded(&ctx->base, async); 406 } 407 408 409 /* *********************************** 410 * ReadPool implementation 411 *************************************/ 412 static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) { 413 int i; 414 for(i=0; i<ctx->completedJobsCount; i++) { 415 IOJob_t* job = (IOJob_t*) ctx->completedJobs[i]; 416 AIO_IOPool_releaseIoJob(job); 417 } 418 ctx->completedJobsCount = 0; 419 } 420 421 static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) { 422 ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; 423 AIO_IOPool_lockJobsMutex(&ctx->base); 424 assert(ctx->completedJobsCount < MAX_IO_JOBS); 425 ctx->completedJobs[ctx->completedJobsCount++] = job; 426 if(AIO_IOPool_threadPoolActive(&ctx->base)) { 427 ZSTD_pthread_cond_signal(&ctx->jobCompletedCond); 428 } 429 AIO_IOPool_unlockJobsMutex(&ctx->base); 430 } 431 432 /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked: 433 * Looks through the completed jobs for a job matching the waitingOnOffset and returns it, 434 * if job wasn't found returns NULL. 435 * IMPORTANT: assumes ioJobsMutex is locked. */ 436 static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) { 437 IOJob_t *job = NULL; 438 int i; 439 /* This implementation goes through all completed jobs and looks for the one matching the next offset. 440 * While not strictly needed for a single threaded reader implementation (as in such a case we could expect 441 * reads to be completed in order) this implementation was chosen as it better fits other asyncio 442 * interfaces (such as io_uring) that do not provide promises regarding order of completion. */ 443 for (i=0; i<ctx->completedJobsCount; i++) { 444 job = (IOJob_t *) ctx->completedJobs[i]; 445 if (job->offset == ctx->waitingOnOffset) { 446 ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount]; 447 return job; 448 } 449 } 450 return NULL; 451 } 452 453 /* AIO_ReadPool_numReadsInFlight: 454 * Returns the number of IO read jobs currently in flight. */ 455 static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) { 456 const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1); 457 return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld)); 458 } 459 460 /* AIO_ReadPool_getNextCompletedJob: 461 * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset. 462 * Would block. */ 463 static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) { 464 IOJob_t *job = NULL; 465 AIO_IOPool_lockJobsMutex(&ctx->base); 466 467 job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); 468 469 /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */ 470 while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) { 471 assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */ 472 ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex); 473 job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); 474 } 475 476 if(job) { 477 assert(job->offset == ctx->waitingOnOffset); 478 ctx->waitingOnOffset += job->usedBufferSize; 479 } 480 481 AIO_IOPool_unlockJobsMutex(&ctx->base); 482 return job; 483 } 484 485 486 /* AIO_ReadPool_executeReadJob: 487 * Executes a read job synchronously. Can be used as a function for a thread pool. */ 488 static void AIO_ReadPool_executeReadJob(void* opaque){ 489 IOJob_t* const job = (IOJob_t*) opaque; 490 ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; 491 if(ctx->reachedEof) { 492 job->usedBufferSize = 0; 493 AIO_ReadPool_addJobToCompleted(job); 494 return; 495 } 496 job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file); 497 if(job->usedBufferSize < job->bufferSize) { 498 if(ferror(job->file)) { 499 EXM_THROW(37, "Read error"); 500 } else if(feof(job->file)) { 501 ctx->reachedEof = 1; 502 } else { 503 EXM_THROW(37, "Unexpected short read"); 504 } 505 } 506 AIO_ReadPool_addJobToCompleted(job); 507 } 508 509 static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) { 510 IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base); 511 job->offset = ctx->nextReadOffset; 512 ctx->nextReadOffset += job->bufferSize; 513 AIO_IOPool_enqueueJob(job); 514 } 515 516 static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) { 517 while(ctx->base.availableJobsCount) { 518 AIO_ReadPool_enqueueRead(ctx); 519 } 520 } 521 522 /* AIO_ReadPool_setFile: 523 * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. 524 * Waits for all current enqueued tasks to complete if a previous file was set. */ 525 void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) { 526 assert(ctx!=NULL); 527 AIO_IOPool_join(&ctx->base); 528 AIO_ReadPool_releaseAllCompletedJobs(ctx); 529 if (ctx->currentJobHeld) { 530 AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld); 531 ctx->currentJobHeld = NULL; 532 } 533 AIO_IOPool_setFile(&ctx->base, file); 534 ctx->nextReadOffset = 0; 535 ctx->waitingOnOffset = 0; 536 ctx->srcBuffer = ctx->coalesceBuffer; 537 ctx->srcBufferLoaded = 0; 538 ctx->reachedEof = 0; 539 if(file != NULL) 540 AIO_ReadPool_startReading(ctx); 541 } 542 543 /* AIO_ReadPool_create: 544 * Allocates and sets and a new readPool including its included jobs. 545 * bufferSize should be set to the maximal buffer we want to read at a time, will also be used 546 * as our basic read size. */ 547 ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) { 548 ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t)); 549 if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); 550 AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize); 551 552 ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2); 553 if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory"); 554 ctx->srcBuffer = ctx->coalesceBuffer; 555 ctx->srcBufferLoaded = 0; 556 ctx->completedJobsCount = 0; 557 ctx->currentJobHeld = NULL; 558 559 if(ctx->base.threadPool) 560 if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL)) 561 EXM_THROW(103,"Failed creating jobCompletedCond cond"); 562 563 return ctx; 564 } 565 566 /* AIO_ReadPool_free: 567 * Frees and releases a readPool and its resources. Closes source file. */ 568 void AIO_ReadPool_free(ReadPoolCtx_t* ctx) { 569 if(AIO_ReadPool_getFile(ctx)) 570 AIO_ReadPool_closeFile(ctx); 571 if(ctx->base.threadPool) 572 ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond); 573 AIO_IOPool_destroy(&ctx->base); 574 free(ctx->coalesceBuffer); 575 free(ctx); 576 } 577 578 /* AIO_ReadPool_consumeBytes: 579 * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ 580 void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) { 581 assert(n <= ctx->srcBufferLoaded); 582 ctx->srcBufferLoaded -= n; 583 ctx->srcBuffer += n; 584 } 585 586 /* AIO_ReadPool_releaseCurrentlyHeldAndGetNext: 587 * Release the current held job and get the next one, returns NULL if no next job available. */ 588 static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) { 589 if (ctx->currentJobHeld) { 590 AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld); 591 ctx->currentJobHeld = NULL; 592 AIO_ReadPool_enqueueRead(ctx); 593 } 594 ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx); 595 return (IOJob_t*) ctx->currentJobHeld; 596 } 597 598 /* AIO_ReadPool_fillBuffer: 599 * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller). 600 * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file. 601 * Return value is the number of bytes added to the buffer. 602 * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */ 603 size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) { 604 IOJob_t *job; 605 int useCoalesce = 0; 606 if(n > ctx->base.jobBufferSize) 607 n = ctx->base.jobBufferSize; 608 609 /* We are good, don't read anything */ 610 if (ctx->srcBufferLoaded >= n) 611 return 0; 612 613 /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job 614 * and coalesce the remaining bytes with the next job's buffer */ 615 if (ctx->srcBufferLoaded > 0) { 616 useCoalesce = 1; 617 memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded); 618 ctx->srcBuffer = ctx->coalesceBuffer; 619 } 620 621 /* Read the next chunk */ 622 job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx); 623 if(!job) 624 return 0; 625 if(useCoalesce) { 626 assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize); 627 memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize); 628 ctx->srcBufferLoaded += job->usedBufferSize; 629 } 630 else { 631 ctx->srcBuffer = (U8 *) job->buffer; 632 ctx->srcBufferLoaded = job->usedBufferSize; 633 } 634 return job->usedBufferSize; 635 } 636 637 /* AIO_ReadPool_consumeAndRefill: 638 * Consumes the current buffer and refills it with bufferSize bytes. */ 639 size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) { 640 AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded); 641 return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize); 642 } 643 644 /* AIO_ReadPool_getFile: 645 * Returns the current file set for the read pool. */ 646 FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) { 647 return AIO_IOPool_getFile(&ctx->base); 648 } 649 650 /* AIO_ReadPool_closeFile: 651 * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ 652 int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) { 653 FILE* const file = AIO_ReadPool_getFile(ctx); 654 AIO_ReadPool_setFile(ctx, NULL); 655 return fclose(file); 656 } 657 658 /* AIO_ReadPool_setAsync: 659 * Allows (de)activating async mode, to be used when the expected overhead 660 * of asyncio costs more than the expected gains. */ 661 void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) { 662 AIO_IOPool_setThreaded(&ctx->base, async); 663 } 664