1*3117ece4Schristos /* 2*3117ece4Schristos * Copyright (c) Meta Platforms, Inc. and affiliates. 3*3117ece4Schristos * All rights reserved. 4*3117ece4Schristos * 5*3117ece4Schristos * This source code is licensed under both the BSD-style license (found in the 6*3117ece4Schristos * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7*3117ece4Schristos * in the COPYING file in the root directory of this source tree). 8*3117ece4Schristos * You may select, at your option, one of the above-listed licenses. 9*3117ece4Schristos */ 10*3117ece4Schristos 11*3117ece4Schristos #include "platform.h" 12*3117ece4Schristos #include <stdio.h> /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */ 13*3117ece4Schristos #include <stdlib.h> /* malloc, free */ 14*3117ece4Schristos #include <assert.h> 15*3117ece4Schristos #include <errno.h> /* errno */ 16*3117ece4Schristos 17*3117ece4Schristos #if defined (_MSC_VER) 18*3117ece4Schristos # include <sys/stat.h> 19*3117ece4Schristos # include <io.h> 20*3117ece4Schristos #endif 21*3117ece4Schristos 22*3117ece4Schristos #include "fileio_asyncio.h" 23*3117ece4Schristos #include "fileio_common.h" 24*3117ece4Schristos 25*3117ece4Schristos /* ********************************************************************** 26*3117ece4Schristos * Sparse write 27*3117ece4Schristos ************************************************************************/ 28*3117ece4Schristos 29*3117ece4Schristos /** AIO_fwriteSparse() : 30*3117ece4Schristos * @return : storedSkips, 31*3117ece4Schristos * argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */ 32*3117ece4Schristos static unsigned 33*3117ece4Schristos AIO_fwriteSparse(FILE* file, 34*3117ece4Schristos const void* buffer, size_t bufferSize, 35*3117ece4Schristos const FIO_prefs_t* const prefs, 36*3117ece4Schristos unsigned storedSkips) 37*3117ece4Schristos { 38*3117ece4Schristos const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */ 39*3117ece4Schristos size_t bufferSizeT = bufferSize / sizeof(size_t); 40*3117ece4Schristos const size_t* const bufferTEnd = bufferT + bufferSizeT; 41*3117ece4Schristos const size_t* ptrT = bufferT; 42*3117ece4Schristos static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */ 43*3117ece4Schristos 44*3117ece4Schristos if (prefs->testMode) return 0; /* do not output anything in test mode */ 45*3117ece4Schristos 46*3117ece4Schristos if (!prefs->sparseFileSupport) { /* normal write */ 47*3117ece4Schristos size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); 48*3117ece4Schristos if (sizeCheck != bufferSize) 49*3117ece4Schristos EXM_THROW(70, "Write error : cannot write block : %s", 50*3117ece4Schristos strerror(errno)); 51*3117ece4Schristos return 0; 52*3117ece4Schristos } 53*3117ece4Schristos 54*3117ece4Schristos /* avoid int overflow */ 55*3117ece4Schristos if (storedSkips > 1 GB) { 56*3117ece4Schristos if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0) 57*3117ece4Schristos EXM_THROW(91, "1 GB skip error (sparse file support)"); 58*3117ece4Schristos storedSkips -= 1 GB; 59*3117ece4Schristos } 60*3117ece4Schristos 61*3117ece4Schristos while (ptrT < bufferTEnd) { 62*3117ece4Schristos size_t nb0T; 63*3117ece4Schristos 64*3117ece4Schristos /* adjust last segment if < 32 KB */ 65*3117ece4Schristos size_t seg0SizeT = segmentSizeT; 66*3117ece4Schristos if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT; 67*3117ece4Schristos bufferSizeT -= seg0SizeT; 68*3117ece4Schristos 69*3117ece4Schristos /* count leading zeroes */ 70*3117ece4Schristos for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ; 71*3117ece4Schristos storedSkips += (unsigned)(nb0T * sizeof(size_t)); 72*3117ece4Schristos 73*3117ece4Schristos if (nb0T != seg0SizeT) { /* not all 0s */ 74*3117ece4Schristos size_t const nbNon0ST = seg0SizeT - nb0T; 75*3117ece4Schristos /* skip leading zeros */ 76*3117ece4Schristos if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) 77*3117ece4Schristos EXM_THROW(92, "Sparse skip error ; try --no-sparse"); 78*3117ece4Schristos storedSkips = 0; 79*3117ece4Schristos /* write the rest */ 80*3117ece4Schristos if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) 81*3117ece4Schristos EXM_THROW(93, "Write error : cannot write block : %s", 82*3117ece4Schristos strerror(errno)); 83*3117ece4Schristos } 84*3117ece4Schristos ptrT += seg0SizeT; 85*3117ece4Schristos } 86*3117ece4Schristos 87*3117ece4Schristos { static size_t const maskT = sizeof(size_t)-1; 88*3117ece4Schristos if (bufferSize & maskT) { 89*3117ece4Schristos /* size not multiple of sizeof(size_t) : implies end of block */ 90*3117ece4Schristos const char* const restStart = (const char*)bufferTEnd; 91*3117ece4Schristos const char* restPtr = restStart; 92*3117ece4Schristos const char* const restEnd = (const char*)buffer + bufferSize; 93*3117ece4Schristos assert(restEnd > restStart && restEnd < restStart + sizeof(size_t)); 94*3117ece4Schristos for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ; 95*3117ece4Schristos storedSkips += (unsigned) (restPtr - restStart); 96*3117ece4Schristos if (restPtr != restEnd) { 97*3117ece4Schristos /* not all remaining bytes are 0 */ 98*3117ece4Schristos size_t const restSize = (size_t)(restEnd - restPtr); 99*3117ece4Schristos if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) 100*3117ece4Schristos EXM_THROW(92, "Sparse skip error ; try --no-sparse"); 101*3117ece4Schristos if (fwrite(restPtr, 1, restSize, file) != restSize) 102*3117ece4Schristos EXM_THROW(95, "Write error : cannot write end of decoded block : %s", 103*3117ece4Schristos strerror(errno)); 104*3117ece4Schristos storedSkips = 0; 105*3117ece4Schristos } } } 106*3117ece4Schristos 107*3117ece4Schristos return storedSkips; 108*3117ece4Schristos } 109*3117ece4Schristos 110*3117ece4Schristos static void 111*3117ece4Schristos AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) 112*3117ece4Schristos { 113*3117ece4Schristos if (prefs->testMode) assert(storedSkips == 0); 114*3117ece4Schristos if (storedSkips>0) { 115*3117ece4Schristos assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */ 116*3117ece4Schristos (void)prefs; /* assert can be disabled, in which case prefs becomes unused */ 117*3117ece4Schristos if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0) 118*3117ece4Schristos EXM_THROW(69, "Final skip error (sparse file support)"); 119*3117ece4Schristos /* last zero must be explicitly written, 120*3117ece4Schristos * so that skipped ones get implicitly translated as zero by FS */ 121*3117ece4Schristos { const char lastZeroByte[1] = { 0 }; 122*3117ece4Schristos if (fwrite(lastZeroByte, 1, 1, file) != 1) 123*3117ece4Schristos EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno)); 124*3117ece4Schristos } } 125*3117ece4Schristos } 126*3117ece4Schristos 127*3117ece4Schristos 128*3117ece4Schristos /* ********************************************************************** 129*3117ece4Schristos * AsyncIO functionality 130*3117ece4Schristos ************************************************************************/ 131*3117ece4Schristos 132*3117ece4Schristos /* AIO_supported: 133*3117ece4Schristos * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ 134*3117ece4Schristos int AIO_supported(void) { 135*3117ece4Schristos #ifdef ZSTD_MULTITHREAD 136*3117ece4Schristos return 1; 137*3117ece4Schristos #else 138*3117ece4Schristos return 0; 139*3117ece4Schristos #endif 140*3117ece4Schristos } 141*3117ece4Schristos 142*3117ece4Schristos /* *********************************** 143*3117ece4Schristos * Generic IoPool implementation 144*3117ece4Schristos *************************************/ 145*3117ece4Schristos 146*3117ece4Schristos static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) { 147*3117ece4Schristos IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t)); 148*3117ece4Schristos void* const buffer = malloc(bufferSize); 149*3117ece4Schristos if(!job || !buffer) 150*3117ece4Schristos EXM_THROW(101, "Allocation error : not enough memory"); 151*3117ece4Schristos job->buffer = buffer; 152*3117ece4Schristos job->bufferSize = bufferSize; 153*3117ece4Schristos job->usedBufferSize = 0; 154*3117ece4Schristos job->file = NULL; 155*3117ece4Schristos job->ctx = ctx; 156*3117ece4Schristos job->offset = 0; 157*3117ece4Schristos return job; 158*3117ece4Schristos } 159*3117ece4Schristos 160*3117ece4Schristos 161*3117ece4Schristos /* AIO_IOPool_createThreadPool: 162*3117ece4Schristos * Creates a thread pool and a mutex for threaded IO pool. 163*3117ece4Schristos * Displays warning if asyncio is requested but MT isn't available. */ 164*3117ece4Schristos static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) { 165*3117ece4Schristos ctx->threadPool = NULL; 166*3117ece4Schristos ctx->threadPoolActive = 0; 167*3117ece4Schristos if(prefs->asyncIO) { 168*3117ece4Schristos if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL)) 169*3117ece4Schristos EXM_THROW(102,"Failed creating ioJobsMutex mutex"); 170*3117ece4Schristos /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to 171*3117ece4Schristos * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ 172*3117ece4Schristos assert(MAX_IO_JOBS >= 2); 173*3117ece4Schristos ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2); 174*3117ece4Schristos ctx->threadPoolActive = 1; 175*3117ece4Schristos if (!ctx->threadPool) 176*3117ece4Schristos EXM_THROW(104, "Failed creating I/O thread pool"); 177*3117ece4Schristos } 178*3117ece4Schristos } 179*3117ece4Schristos 180*3117ece4Schristos /* AIO_IOPool_init: 181*3117ece4Schristos * Allocates and sets and a new I/O thread pool including its included availableJobs. */ 182*3117ece4Schristos static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) { 183*3117ece4Schristos int i; 184*3117ece4Schristos AIO_IOPool_createThreadPool(ctx, prefs); 185*3117ece4Schristos ctx->prefs = prefs; 186*3117ece4Schristos ctx->poolFunction = poolFunction; 187*3117ece4Schristos ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2; 188*3117ece4Schristos ctx->availableJobsCount = ctx->totalIoJobs; 189*3117ece4Schristos for(i=0; i < ctx->availableJobsCount; i++) { 190*3117ece4Schristos ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize); 191*3117ece4Schristos } 192*3117ece4Schristos ctx->jobBufferSize = bufferSize; 193*3117ece4Schristos ctx->file = NULL; 194*3117ece4Schristos } 195*3117ece4Schristos 196*3117ece4Schristos 197*3117ece4Schristos /* AIO_IOPool_threadPoolActive: 198*3117ece4Schristos * Check if current operation uses thread pool. 199*3117ece4Schristos * Note that in some cases we have a thread pool initialized but choose not to use it. */ 200*3117ece4Schristos static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) { 201*3117ece4Schristos return ctx->threadPool && ctx->threadPoolActive; 202*3117ece4Schristos } 203*3117ece4Schristos 204*3117ece4Schristos 205*3117ece4Schristos /* AIO_IOPool_lockJobsMutex: 206*3117ece4Schristos * Locks the IO jobs mutex if threading is active */ 207*3117ece4Schristos static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) { 208*3117ece4Schristos if(AIO_IOPool_threadPoolActive(ctx)) 209*3117ece4Schristos ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); 210*3117ece4Schristos } 211*3117ece4Schristos 212*3117ece4Schristos /* AIO_IOPool_unlockJobsMutex: 213*3117ece4Schristos * Unlocks the IO jobs mutex if threading is active */ 214*3117ece4Schristos static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) { 215*3117ece4Schristos if(AIO_IOPool_threadPoolActive(ctx)) 216*3117ece4Schristos ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); 217*3117ece4Schristos } 218*3117ece4Schristos 219*3117ece4Schristos /* AIO_IOPool_releaseIoJob: 220*3117ece4Schristos * Releases an acquired job back to the pool. Doesn't execute the job. */ 221*3117ece4Schristos static void AIO_IOPool_releaseIoJob(IOJob_t* job) { 222*3117ece4Schristos IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx; 223*3117ece4Schristos AIO_IOPool_lockJobsMutex(ctx); 224*3117ece4Schristos assert(ctx->availableJobsCount < ctx->totalIoJobs); 225*3117ece4Schristos ctx->availableJobs[ctx->availableJobsCount++] = job; 226*3117ece4Schristos AIO_IOPool_unlockJobsMutex(ctx); 227*3117ece4Schristos } 228*3117ece4Schristos 229*3117ece4Schristos /* AIO_IOPool_join: 230*3117ece4Schristos * Waits for all tasks in the pool to finish executing. */ 231*3117ece4Schristos static void AIO_IOPool_join(IOPoolCtx_t* ctx) { 232*3117ece4Schristos if(AIO_IOPool_threadPoolActive(ctx)) 233*3117ece4Schristos POOL_joinJobs(ctx->threadPool); 234*3117ece4Schristos } 235*3117ece4Schristos 236*3117ece4Schristos /* AIO_IOPool_setThreaded: 237*3117ece4Schristos * Allows (de)activating threaded mode, to be used when the expected overhead 238*3117ece4Schristos * of threading costs more than the expected gains. */ 239*3117ece4Schristos static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) { 240*3117ece4Schristos assert(threaded == 0 || threaded == 1); 241*3117ece4Schristos assert(ctx != NULL); 242*3117ece4Schristos if(ctx->threadPoolActive != threaded) { 243*3117ece4Schristos AIO_IOPool_join(ctx); 244*3117ece4Schristos ctx->threadPoolActive = threaded; 245*3117ece4Schristos } 246*3117ece4Schristos } 247*3117ece4Schristos 248*3117ece4Schristos /* AIO_IOPool_free: 249*3117ece4Schristos * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */ 250*3117ece4Schristos static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) { 251*3117ece4Schristos int i; 252*3117ece4Schristos if(ctx->threadPool) { 253*3117ece4Schristos /* Make sure we finish all tasks and then free the resources */ 254*3117ece4Schristos AIO_IOPool_join(ctx); 255*3117ece4Schristos /* Make sure we are not leaking availableJobs */ 256*3117ece4Schristos assert(ctx->availableJobsCount == ctx->totalIoJobs); 257*3117ece4Schristos POOL_free(ctx->threadPool); 258*3117ece4Schristos ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex); 259*3117ece4Schristos } 260*3117ece4Schristos assert(ctx->file == NULL); 261*3117ece4Schristos for(i=0; i<ctx->availableJobsCount; i++) { 262*3117ece4Schristos IOJob_t* job = (IOJob_t*) ctx->availableJobs[i]; 263*3117ece4Schristos free(job->buffer); 264*3117ece4Schristos free(job); 265*3117ece4Schristos } 266*3117ece4Schristos } 267*3117ece4Schristos 268*3117ece4Schristos /* AIO_IOPool_acquireJob: 269*3117ece4Schristos * Returns an available io job to be used for a future io. */ 270*3117ece4Schristos static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) { 271*3117ece4Schristos IOJob_t *job; 272*3117ece4Schristos assert(ctx->file != NULL || ctx->prefs->testMode); 273*3117ece4Schristos AIO_IOPool_lockJobsMutex(ctx); 274*3117ece4Schristos assert(ctx->availableJobsCount > 0); 275*3117ece4Schristos job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount]; 276*3117ece4Schristos AIO_IOPool_unlockJobsMutex(ctx); 277*3117ece4Schristos job->usedBufferSize = 0; 278*3117ece4Schristos job->file = ctx->file; 279*3117ece4Schristos job->offset = 0; 280*3117ece4Schristos return job; 281*3117ece4Schristos } 282*3117ece4Schristos 283*3117ece4Schristos 284*3117ece4Schristos /* AIO_IOPool_setFile: 285*3117ece4Schristos * Sets the destination file for future files in the pool. 286*3117ece4Schristos * Requires completion of all queued jobs and release of all otherwise acquired jobs. */ 287*3117ece4Schristos static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) { 288*3117ece4Schristos assert(ctx!=NULL); 289*3117ece4Schristos AIO_IOPool_join(ctx); 290*3117ece4Schristos assert(ctx->availableJobsCount == ctx->totalIoJobs); 291*3117ece4Schristos ctx->file = file; 292*3117ece4Schristos } 293*3117ece4Schristos 294*3117ece4Schristos static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) { 295*3117ece4Schristos return ctx->file; 296*3117ece4Schristos } 297*3117ece4Schristos 298*3117ece4Schristos /* AIO_IOPool_enqueueJob: 299*3117ece4Schristos * Enqueues an io job for execution. 300*3117ece4Schristos * The queued job shouldn't be used directly after queueing it. */ 301*3117ece4Schristos static void AIO_IOPool_enqueueJob(IOJob_t* job) { 302*3117ece4Schristos IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx; 303*3117ece4Schristos if(AIO_IOPool_threadPoolActive(ctx)) 304*3117ece4Schristos POOL_add(ctx->threadPool, ctx->poolFunction, job); 305*3117ece4Schristos else 306*3117ece4Schristos ctx->poolFunction(job); 307*3117ece4Schristos } 308*3117ece4Schristos 309*3117ece4Schristos /* *********************************** 310*3117ece4Schristos * WritePool implementation 311*3117ece4Schristos *************************************/ 312*3117ece4Schristos 313*3117ece4Schristos /* AIO_WritePool_acquireJob: 314*3117ece4Schristos * Returns an available write job to be used for a future write. */ 315*3117ece4Schristos IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) { 316*3117ece4Schristos return AIO_IOPool_acquireJob(&ctx->base); 317*3117ece4Schristos } 318*3117ece4Schristos 319*3117ece4Schristos /* AIO_WritePool_enqueueAndReacquireWriteJob: 320*3117ece4Schristos * Queues a write job for execution and acquires a new one. 321*3117ece4Schristos * After execution `job`'s pointed value would change to the newly acquired job. 322*3117ece4Schristos * Make sure to set `usedBufferSize` to the wanted length before call. 323*3117ece4Schristos * The queued job shouldn't be used directly after queueing it. */ 324*3117ece4Schristos void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) { 325*3117ece4Schristos AIO_IOPool_enqueueJob(*job); 326*3117ece4Schristos *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx); 327*3117ece4Schristos } 328*3117ece4Schristos 329*3117ece4Schristos /* AIO_WritePool_sparseWriteEnd: 330*3117ece4Schristos * Ends sparse writes to the current file. 331*3117ece4Schristos * Blocks on completion of all current write jobs before executing. */ 332*3117ece4Schristos void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) { 333*3117ece4Schristos assert(ctx != NULL); 334*3117ece4Schristos AIO_IOPool_join(&ctx->base); 335*3117ece4Schristos AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips); 336*3117ece4Schristos ctx->storedSkips = 0; 337*3117ece4Schristos } 338*3117ece4Schristos 339*3117ece4Schristos /* AIO_WritePool_setFile: 340*3117ece4Schristos * Sets the destination file for future writes in the pool. 341*3117ece4Schristos * Requires completion of all queues write jobs and release of all otherwise acquired jobs. 342*3117ece4Schristos * Also requires ending of sparse write if a previous file was used in sparse mode. */ 343*3117ece4Schristos void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) { 344*3117ece4Schristos AIO_IOPool_setFile(&ctx->base, file); 345*3117ece4Schristos assert(ctx->storedSkips == 0); 346*3117ece4Schristos } 347*3117ece4Schristos 348*3117ece4Schristos /* AIO_WritePool_getFile: 349*3117ece4Schristos * Returns the file the writePool is currently set to write to. */ 350*3117ece4Schristos FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) { 351*3117ece4Schristos return AIO_IOPool_getFile(&ctx->base); 352*3117ece4Schristos } 353*3117ece4Schristos 354*3117ece4Schristos /* AIO_WritePool_releaseIoJob: 355*3117ece4Schristos * Releases an acquired job back to the pool. Doesn't execute the job. */ 356*3117ece4Schristos void AIO_WritePool_releaseIoJob(IOJob_t* job) { 357*3117ece4Schristos AIO_IOPool_releaseIoJob(job); 358*3117ece4Schristos } 359*3117ece4Schristos 360*3117ece4Schristos /* AIO_WritePool_closeFile: 361*3117ece4Schristos * Ends sparse write and closes the writePool's current file and sets the file to NULL. 362*3117ece4Schristos * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ 363*3117ece4Schristos int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) { 364*3117ece4Schristos FILE* const dstFile = ctx->base.file; 365*3117ece4Schristos assert(dstFile!=NULL || ctx->base.prefs->testMode!=0); 366*3117ece4Schristos AIO_WritePool_sparseWriteEnd(ctx); 367*3117ece4Schristos AIO_IOPool_setFile(&ctx->base, NULL); 368*3117ece4Schristos return fclose(dstFile); 369*3117ece4Schristos } 370*3117ece4Schristos 371*3117ece4Schristos /* AIO_WritePool_executeWriteJob: 372*3117ece4Schristos * Executes a write job synchronously. Can be used as a function for a thread pool. */ 373*3117ece4Schristos static void AIO_WritePool_executeWriteJob(void* opaque){ 374*3117ece4Schristos IOJob_t* const job = (IOJob_t*) opaque; 375*3117ece4Schristos WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx; 376*3117ece4Schristos ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips); 377*3117ece4Schristos AIO_IOPool_releaseIoJob(job); 378*3117ece4Schristos } 379*3117ece4Schristos 380*3117ece4Schristos /* AIO_WritePool_create: 381*3117ece4Schristos * Allocates and sets and a new write pool including its included jobs. */ 382*3117ece4Schristos WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) { 383*3117ece4Schristos WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t)); 384*3117ece4Schristos if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); 385*3117ece4Schristos AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize); 386*3117ece4Schristos ctx->storedSkips = 0; 387*3117ece4Schristos return ctx; 388*3117ece4Schristos } 389*3117ece4Schristos 390*3117ece4Schristos /* AIO_WritePool_free: 391*3117ece4Schristos * Frees and releases a writePool and its resources. Closes destination file if needs to. */ 392*3117ece4Schristos void AIO_WritePool_free(WritePoolCtx_t* ctx) { 393*3117ece4Schristos /* Make sure we finish all tasks and then free the resources */ 394*3117ece4Schristos if(AIO_WritePool_getFile(ctx)) 395*3117ece4Schristos AIO_WritePool_closeFile(ctx); 396*3117ece4Schristos AIO_IOPool_destroy(&ctx->base); 397*3117ece4Schristos assert(ctx->storedSkips==0); 398*3117ece4Schristos free(ctx); 399*3117ece4Schristos } 400*3117ece4Schristos 401*3117ece4Schristos /* AIO_WritePool_setAsync: 402*3117ece4Schristos * Allows (de)activating async mode, to be used when the expected overhead 403*3117ece4Schristos * of asyncio costs more than the expected gains. */ 404*3117ece4Schristos void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) { 405*3117ece4Schristos AIO_IOPool_setThreaded(&ctx->base, async); 406*3117ece4Schristos } 407*3117ece4Schristos 408*3117ece4Schristos 409*3117ece4Schristos /* *********************************** 410*3117ece4Schristos * ReadPool implementation 411*3117ece4Schristos *************************************/ 412*3117ece4Schristos static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) { 413*3117ece4Schristos int i; 414*3117ece4Schristos for(i=0; i<ctx->completedJobsCount; i++) { 415*3117ece4Schristos IOJob_t* job = (IOJob_t*) ctx->completedJobs[i]; 416*3117ece4Schristos AIO_IOPool_releaseIoJob(job); 417*3117ece4Schristos } 418*3117ece4Schristos ctx->completedJobsCount = 0; 419*3117ece4Schristos } 420*3117ece4Schristos 421*3117ece4Schristos static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) { 422*3117ece4Schristos ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; 423*3117ece4Schristos AIO_IOPool_lockJobsMutex(&ctx->base); 424*3117ece4Schristos assert(ctx->completedJobsCount < MAX_IO_JOBS); 425*3117ece4Schristos ctx->completedJobs[ctx->completedJobsCount++] = job; 426*3117ece4Schristos if(AIO_IOPool_threadPoolActive(&ctx->base)) { 427*3117ece4Schristos ZSTD_pthread_cond_signal(&ctx->jobCompletedCond); 428*3117ece4Schristos } 429*3117ece4Schristos AIO_IOPool_unlockJobsMutex(&ctx->base); 430*3117ece4Schristos } 431*3117ece4Schristos 432*3117ece4Schristos /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked: 433*3117ece4Schristos * Looks through the completed jobs for a job matching the waitingOnOffset and returns it, 434*3117ece4Schristos * if job wasn't found returns NULL. 435*3117ece4Schristos * IMPORTANT: assumes ioJobsMutex is locked. */ 436*3117ece4Schristos static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) { 437*3117ece4Schristos IOJob_t *job = NULL; 438*3117ece4Schristos int i; 439*3117ece4Schristos /* This implementation goes through all completed jobs and looks for the one matching the next offset. 440*3117ece4Schristos * While not strictly needed for a single threaded reader implementation (as in such a case we could expect 441*3117ece4Schristos * reads to be completed in order) this implementation was chosen as it better fits other asyncio 442*3117ece4Schristos * interfaces (such as io_uring) that do not provide promises regarding order of completion. */ 443*3117ece4Schristos for (i=0; i<ctx->completedJobsCount; i++) { 444*3117ece4Schristos job = (IOJob_t *) ctx->completedJobs[i]; 445*3117ece4Schristos if (job->offset == ctx->waitingOnOffset) { 446*3117ece4Schristos ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount]; 447*3117ece4Schristos return job; 448*3117ece4Schristos } 449*3117ece4Schristos } 450*3117ece4Schristos return NULL; 451*3117ece4Schristos } 452*3117ece4Schristos 453*3117ece4Schristos /* AIO_ReadPool_numReadsInFlight: 454*3117ece4Schristos * Returns the number of IO read jobs currently in flight. */ 455*3117ece4Schristos static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) { 456*3117ece4Schristos const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1); 457*3117ece4Schristos return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld)); 458*3117ece4Schristos } 459*3117ece4Schristos 460*3117ece4Schristos /* AIO_ReadPool_getNextCompletedJob: 461*3117ece4Schristos * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset. 462*3117ece4Schristos * Would block. */ 463*3117ece4Schristos static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) { 464*3117ece4Schristos IOJob_t *job = NULL; 465*3117ece4Schristos AIO_IOPool_lockJobsMutex(&ctx->base); 466*3117ece4Schristos 467*3117ece4Schristos job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); 468*3117ece4Schristos 469*3117ece4Schristos /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */ 470*3117ece4Schristos while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) { 471*3117ece4Schristos assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */ 472*3117ece4Schristos ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex); 473*3117ece4Schristos job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); 474*3117ece4Schristos } 475*3117ece4Schristos 476*3117ece4Schristos if(job) { 477*3117ece4Schristos assert(job->offset == ctx->waitingOnOffset); 478*3117ece4Schristos ctx->waitingOnOffset += job->usedBufferSize; 479*3117ece4Schristos } 480*3117ece4Schristos 481*3117ece4Schristos AIO_IOPool_unlockJobsMutex(&ctx->base); 482*3117ece4Schristos return job; 483*3117ece4Schristos } 484*3117ece4Schristos 485*3117ece4Schristos 486*3117ece4Schristos /* AIO_ReadPool_executeReadJob: 487*3117ece4Schristos * Executes a read job synchronously. Can be used as a function for a thread pool. */ 488*3117ece4Schristos static void AIO_ReadPool_executeReadJob(void* opaque){ 489*3117ece4Schristos IOJob_t* const job = (IOJob_t*) opaque; 490*3117ece4Schristos ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; 491*3117ece4Schristos if(ctx->reachedEof) { 492*3117ece4Schristos job->usedBufferSize = 0; 493*3117ece4Schristos AIO_ReadPool_addJobToCompleted(job); 494*3117ece4Schristos return; 495*3117ece4Schristos } 496*3117ece4Schristos job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file); 497*3117ece4Schristos if(job->usedBufferSize < job->bufferSize) { 498*3117ece4Schristos if(ferror(job->file)) { 499*3117ece4Schristos EXM_THROW(37, "Read error"); 500*3117ece4Schristos } else if(feof(job->file)) { 501*3117ece4Schristos ctx->reachedEof = 1; 502*3117ece4Schristos } else { 503*3117ece4Schristos EXM_THROW(37, "Unexpected short read"); 504*3117ece4Schristos } 505*3117ece4Schristos } 506*3117ece4Schristos AIO_ReadPool_addJobToCompleted(job); 507*3117ece4Schristos } 508*3117ece4Schristos 509*3117ece4Schristos static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) { 510*3117ece4Schristos IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base); 511*3117ece4Schristos job->offset = ctx->nextReadOffset; 512*3117ece4Schristos ctx->nextReadOffset += job->bufferSize; 513*3117ece4Schristos AIO_IOPool_enqueueJob(job); 514*3117ece4Schristos } 515*3117ece4Schristos 516*3117ece4Schristos static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) { 517*3117ece4Schristos while(ctx->base.availableJobsCount) { 518*3117ece4Schristos AIO_ReadPool_enqueueRead(ctx); 519*3117ece4Schristos } 520*3117ece4Schristos } 521*3117ece4Schristos 522*3117ece4Schristos /* AIO_ReadPool_setFile: 523*3117ece4Schristos * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. 524*3117ece4Schristos * Waits for all current enqueued tasks to complete if a previous file was set. */ 525*3117ece4Schristos void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) { 526*3117ece4Schristos assert(ctx!=NULL); 527*3117ece4Schristos AIO_IOPool_join(&ctx->base); 528*3117ece4Schristos AIO_ReadPool_releaseAllCompletedJobs(ctx); 529*3117ece4Schristos if (ctx->currentJobHeld) { 530*3117ece4Schristos AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld); 531*3117ece4Schristos ctx->currentJobHeld = NULL; 532*3117ece4Schristos } 533*3117ece4Schristos AIO_IOPool_setFile(&ctx->base, file); 534*3117ece4Schristos ctx->nextReadOffset = 0; 535*3117ece4Schristos ctx->waitingOnOffset = 0; 536*3117ece4Schristos ctx->srcBuffer = ctx->coalesceBuffer; 537*3117ece4Schristos ctx->srcBufferLoaded = 0; 538*3117ece4Schristos ctx->reachedEof = 0; 539*3117ece4Schristos if(file != NULL) 540*3117ece4Schristos AIO_ReadPool_startReading(ctx); 541*3117ece4Schristos } 542*3117ece4Schristos 543*3117ece4Schristos /* AIO_ReadPool_create: 544*3117ece4Schristos * Allocates and sets and a new readPool including its included jobs. 545*3117ece4Schristos * bufferSize should be set to the maximal buffer we want to read at a time, will also be used 546*3117ece4Schristos * as our basic read size. */ 547*3117ece4Schristos ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) { 548*3117ece4Schristos ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t)); 549*3117ece4Schristos if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); 550*3117ece4Schristos AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize); 551*3117ece4Schristos 552*3117ece4Schristos ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2); 553*3117ece4Schristos if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory"); 554*3117ece4Schristos ctx->srcBuffer = ctx->coalesceBuffer; 555*3117ece4Schristos ctx->srcBufferLoaded = 0; 556*3117ece4Schristos ctx->completedJobsCount = 0; 557*3117ece4Schristos ctx->currentJobHeld = NULL; 558*3117ece4Schristos 559*3117ece4Schristos if(ctx->base.threadPool) 560*3117ece4Schristos if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL)) 561*3117ece4Schristos EXM_THROW(103,"Failed creating jobCompletedCond cond"); 562*3117ece4Schristos 563*3117ece4Schristos return ctx; 564*3117ece4Schristos } 565*3117ece4Schristos 566*3117ece4Schristos /* AIO_ReadPool_free: 567*3117ece4Schristos * Frees and releases a readPool and its resources. Closes source file. */ 568*3117ece4Schristos void AIO_ReadPool_free(ReadPoolCtx_t* ctx) { 569*3117ece4Schristos if(AIO_ReadPool_getFile(ctx)) 570*3117ece4Schristos AIO_ReadPool_closeFile(ctx); 571*3117ece4Schristos if(ctx->base.threadPool) 572*3117ece4Schristos ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond); 573*3117ece4Schristos AIO_IOPool_destroy(&ctx->base); 574*3117ece4Schristos free(ctx->coalesceBuffer); 575*3117ece4Schristos free(ctx); 576*3117ece4Schristos } 577*3117ece4Schristos 578*3117ece4Schristos /* AIO_ReadPool_consumeBytes: 579*3117ece4Schristos * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ 580*3117ece4Schristos void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) { 581*3117ece4Schristos assert(n <= ctx->srcBufferLoaded); 582*3117ece4Schristos ctx->srcBufferLoaded -= n; 583*3117ece4Schristos ctx->srcBuffer += n; 584*3117ece4Schristos } 585*3117ece4Schristos 586*3117ece4Schristos /* AIO_ReadPool_releaseCurrentlyHeldAndGetNext: 587*3117ece4Schristos * Release the current held job and get the next one, returns NULL if no next job available. */ 588*3117ece4Schristos static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) { 589*3117ece4Schristos if (ctx->currentJobHeld) { 590*3117ece4Schristos AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld); 591*3117ece4Schristos ctx->currentJobHeld = NULL; 592*3117ece4Schristos AIO_ReadPool_enqueueRead(ctx); 593*3117ece4Schristos } 594*3117ece4Schristos ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx); 595*3117ece4Schristos return (IOJob_t*) ctx->currentJobHeld; 596*3117ece4Schristos } 597*3117ece4Schristos 598*3117ece4Schristos /* AIO_ReadPool_fillBuffer: 599*3117ece4Schristos * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller). 600*3117ece4Schristos * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file. 601*3117ece4Schristos * Return value is the number of bytes added to the buffer. 602*3117ece4Schristos * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */ 603*3117ece4Schristos size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) { 604*3117ece4Schristos IOJob_t *job; 605*3117ece4Schristos int useCoalesce = 0; 606*3117ece4Schristos if(n > ctx->base.jobBufferSize) 607*3117ece4Schristos n = ctx->base.jobBufferSize; 608*3117ece4Schristos 609*3117ece4Schristos /* We are good, don't read anything */ 610*3117ece4Schristos if (ctx->srcBufferLoaded >= n) 611*3117ece4Schristos return 0; 612*3117ece4Schristos 613*3117ece4Schristos /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job 614*3117ece4Schristos * and coalesce the remaining bytes with the next job's buffer */ 615*3117ece4Schristos if (ctx->srcBufferLoaded > 0) { 616*3117ece4Schristos useCoalesce = 1; 617*3117ece4Schristos memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded); 618*3117ece4Schristos ctx->srcBuffer = ctx->coalesceBuffer; 619*3117ece4Schristos } 620*3117ece4Schristos 621*3117ece4Schristos /* Read the next chunk */ 622*3117ece4Schristos job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx); 623*3117ece4Schristos if(!job) 624*3117ece4Schristos return 0; 625*3117ece4Schristos if(useCoalesce) { 626*3117ece4Schristos assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize); 627*3117ece4Schristos memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize); 628*3117ece4Schristos ctx->srcBufferLoaded += job->usedBufferSize; 629*3117ece4Schristos } 630*3117ece4Schristos else { 631*3117ece4Schristos ctx->srcBuffer = (U8 *) job->buffer; 632*3117ece4Schristos ctx->srcBufferLoaded = job->usedBufferSize; 633*3117ece4Schristos } 634*3117ece4Schristos return job->usedBufferSize; 635*3117ece4Schristos } 636*3117ece4Schristos 637*3117ece4Schristos /* AIO_ReadPool_consumeAndRefill: 638*3117ece4Schristos * Consumes the current buffer and refills it with bufferSize bytes. */ 639*3117ece4Schristos size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) { 640*3117ece4Schristos AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded); 641*3117ece4Schristos return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize); 642*3117ece4Schristos } 643*3117ece4Schristos 644*3117ece4Schristos /* AIO_ReadPool_getFile: 645*3117ece4Schristos * Returns the current file set for the read pool. */ 646*3117ece4Schristos FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) { 647*3117ece4Schristos return AIO_IOPool_getFile(&ctx->base); 648*3117ece4Schristos } 649*3117ece4Schristos 650*3117ece4Schristos /* AIO_ReadPool_closeFile: 651*3117ece4Schristos * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ 652*3117ece4Schristos int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) { 653*3117ece4Schristos FILE* const file = AIO_ReadPool_getFile(ctx); 654*3117ece4Schristos AIO_ReadPool_setFile(ctx, NULL); 655*3117ece4Schristos return fclose(file); 656*3117ece4Schristos } 657*3117ece4Schristos 658*3117ece4Schristos /* AIO_ReadPool_setAsync: 659*3117ece4Schristos * Allows (de)activating async mode, to be used when the expected overhead 660*3117ece4Schristos * of asyncio costs more than the expected gains. */ 661*3117ece4Schristos void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) { 662*3117ece4Schristos AIO_IOPool_setThreaded(&ctx->base, async); 663*3117ece4Schristos } 664