1*3117ece4Schristos /* 2*3117ece4Schristos * Copyright (c) Meta Platforms, Inc. and affiliates. 3*3117ece4Schristos * All rights reserved. 4*3117ece4Schristos * 5*3117ece4Schristos * This source code is licensed under both the BSD-style license (found in the 6*3117ece4Schristos * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7*3117ece4Schristos * in the COPYING file in the root directory of this source tree). 8*3117ece4Schristos * You may select, at your option, one of the above-listed licenses. 9*3117ece4Schristos */ 10*3117ece4Schristos 11*3117ece4Schristos /* 12*3117ece4Schristos * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously. 13*3117ece4Schristos * Current implementation relies on having one thread that reads and one that 14*3117ece4Schristos * writes. 15*3117ece4Schristos * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but 16*3117ece4Schristos * are performed serially by the appropriate worker thread. 17*3117ece4Schristos * Most systems exposes better primitives to perform asynchronous IO, such as 18*3117ece4Schristos * io_uring on newer linux systems. The API is built in such a way that in the 19*3117ece4Schristos * future we could replace the threads with better solutions when available. 20*3117ece4Schristos */ 21*3117ece4Schristos 22*3117ece4Schristos #ifndef ZSTD_FILEIO_ASYNCIO_H 23*3117ece4Schristos #define ZSTD_FILEIO_ASYNCIO_H 24*3117ece4Schristos 25*3117ece4Schristos #if defined (__cplusplus) 26*3117ece4Schristos extern "C" { 27*3117ece4Schristos #endif 28*3117ece4Schristos 29*3117ece4Schristos #include "../lib/common/mem.h" /* U32, U64 */ 30*3117ece4Schristos #include "fileio_types.h" 31*3117ece4Schristos #include "platform.h" 32*3117ece4Schristos #include "util.h" 33*3117ece4Schristos #include "../lib/common/pool.h" 34*3117ece4Schristos #include "../lib/common/threading.h" 35*3117ece4Schristos 36*3117ece4Schristos #define MAX_IO_JOBS (10) 37*3117ece4Schristos 38*3117ece4Schristos typedef struct { 39*3117ece4Schristos /* These struct fields should be set only on creation and not changed afterwards */ 40*3117ece4Schristos POOL_ctx* threadPool; 41*3117ece4Schristos int threadPoolActive; 42*3117ece4Schristos int totalIoJobs; 43*3117ece4Schristos const FIO_prefs_t* prefs; 44*3117ece4Schristos POOL_function poolFunction; 45*3117ece4Schristos 46*3117ece4Schristos /* Controls the file we currently write to, make changes only by using provided utility functions */ 47*3117ece4Schristos FILE* file; 48*3117ece4Schristos 49*3117ece4Schristos /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should 50*3117ece4Schristos * only be mutated after locking the mutex */ 51*3117ece4Schristos ZSTD_pthread_mutex_t ioJobsMutex; 52*3117ece4Schristos void* availableJobs[MAX_IO_JOBS]; 53*3117ece4Schristos int availableJobsCount; 54*3117ece4Schristos size_t jobBufferSize; 55*3117ece4Schristos } IOPoolCtx_t; 56*3117ece4Schristos 57*3117ece4Schristos typedef struct { 58*3117ece4Schristos IOPoolCtx_t base; 59*3117ece4Schristos 60*3117ece4Schristos /* State regarding the currently read file */ 61*3117ece4Schristos int reachedEof; 62*3117ece4Schristos U64 nextReadOffset; 63*3117ece4Schristos U64 waitingOnOffset; 64*3117ece4Schristos 65*3117ece4Schristos /* We may hold an IOJob object as needed if we actively expose its buffer. */ 66*3117ece4Schristos void *currentJobHeld; 67*3117ece4Schristos 68*3117ece4Schristos /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in 69*3117ece4Schristos * the first of them. Shouldn't be accessed from outside ot utility functions. */ 70*3117ece4Schristos U8 *coalesceBuffer; 71*3117ece4Schristos 72*3117ece4Schristos /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might 73*3117ece4Schristos * change when consuming / refilling buffer. */ 74*3117ece4Schristos U8 *srcBuffer; 75*3117ece4Schristos size_t srcBufferLoaded; 76*3117ece4Schristos 77*3117ece4Schristos /* We need to know what tasks completed so we can use their buffers when their time comes. 78*3117ece4Schristos * Should only be accessed after locking base.ioJobsMutex . */ 79*3117ece4Schristos void* completedJobs[MAX_IO_JOBS]; 80*3117ece4Schristos int completedJobsCount; 81*3117ece4Schristos ZSTD_pthread_cond_t jobCompletedCond; 82*3117ece4Schristos } ReadPoolCtx_t; 83*3117ece4Schristos 84*3117ece4Schristos typedef struct { 85*3117ece4Schristos IOPoolCtx_t base; 86*3117ece4Schristos unsigned storedSkips; 87*3117ece4Schristos } WritePoolCtx_t; 88*3117ece4Schristos 89*3117ece4Schristos typedef struct { 90*3117ece4Schristos /* These fields are automatically set and shouldn't be changed by non WritePool code. */ 91*3117ece4Schristos void *ctx; 92*3117ece4Schristos FILE* file; 93*3117ece4Schristos void *buffer; 94*3117ece4Schristos size_t bufferSize; 95*3117ece4Schristos 96*3117ece4Schristos /* This field should be changed before a job is queued for execution and should contain the number 97*3117ece4Schristos * of bytes to write from the buffer. */ 98*3117ece4Schristos size_t usedBufferSize; 99*3117ece4Schristos U64 offset; 100*3117ece4Schristos } IOJob_t; 101*3117ece4Schristos 102*3117ece4Schristos /* AIO_supported: 103*3117ece4Schristos * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ 104*3117ece4Schristos int AIO_supported(void); 105*3117ece4Schristos 106*3117ece4Schristos 107*3117ece4Schristos /* AIO_WritePool_releaseIoJob: 108*3117ece4Schristos * Releases an acquired job back to the pool. Doesn't execute the job. */ 109*3117ece4Schristos void AIO_WritePool_releaseIoJob(IOJob_t *job); 110*3117ece4Schristos 111*3117ece4Schristos /* AIO_WritePool_acquireJob: 112*3117ece4Schristos * Returns an available write job to be used for a future write. */ 113*3117ece4Schristos IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx); 114*3117ece4Schristos 115*3117ece4Schristos /* AIO_WritePool_enqueueAndReacquireWriteJob: 116*3117ece4Schristos * Enqueues a write job for execution and acquires a new one. 117*3117ece4Schristos * After execution `job`'s pointed value would change to the newly acquired job. 118*3117ece4Schristos * Make sure to set `usedBufferSize` to the wanted length before call. 119*3117ece4Schristos * The queued job shouldn't be used directly after queueing it. */ 120*3117ece4Schristos void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job); 121*3117ece4Schristos 122*3117ece4Schristos /* AIO_WritePool_sparseWriteEnd: 123*3117ece4Schristos * Ends sparse writes to the current file. 124*3117ece4Schristos * Blocks on completion of all current write jobs before executing. */ 125*3117ece4Schristos void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx); 126*3117ece4Schristos 127*3117ece4Schristos /* AIO_WritePool_setFile: 128*3117ece4Schristos * Sets the destination file for future writes in the pool. 129*3117ece4Schristos * Requires completion of all queues write jobs and release of all otherwise acquired jobs. 130*3117ece4Schristos * Also requires ending of sparse write if a previous file was used in sparse mode. */ 131*3117ece4Schristos void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file); 132*3117ece4Schristos 133*3117ece4Schristos /* AIO_WritePool_getFile: 134*3117ece4Schristos * Returns the file the writePool is currently set to write to. */ 135*3117ece4Schristos FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx); 136*3117ece4Schristos 137*3117ece4Schristos /* AIO_WritePool_closeFile: 138*3117ece4Schristos * Ends sparse write and closes the writePool's current file and sets the file to NULL. 139*3117ece4Schristos * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ 140*3117ece4Schristos int AIO_WritePool_closeFile(WritePoolCtx_t *ctx); 141*3117ece4Schristos 142*3117ece4Schristos /* AIO_WritePool_create: 143*3117ece4Schristos * Allocates and sets and a new write pool including its included jobs. 144*3117ece4Schristos * bufferSize should be set to the maximal buffer we want to write to at a time. */ 145*3117ece4Schristos WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize); 146*3117ece4Schristos 147*3117ece4Schristos /* AIO_WritePool_free: 148*3117ece4Schristos * Frees and releases a writePool and its resources. Closes destination file. */ 149*3117ece4Schristos void AIO_WritePool_free(WritePoolCtx_t* ctx); 150*3117ece4Schristos 151*3117ece4Schristos /* AIO_WritePool_setAsync: 152*3117ece4Schristos * Allows (de)activating async mode, to be used when the expected overhead 153*3117ece4Schristos * of asyncio costs more than the expected gains. */ 154*3117ece4Schristos void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async); 155*3117ece4Schristos 156*3117ece4Schristos /* AIO_ReadPool_create: 157*3117ece4Schristos * Allocates and sets and a new readPool including its included jobs. 158*3117ece4Schristos * bufferSize should be set to the maximal buffer we want to read at a time, will also be used 159*3117ece4Schristos * as our basic read size. */ 160*3117ece4Schristos ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize); 161*3117ece4Schristos 162*3117ece4Schristos /* AIO_ReadPool_free: 163*3117ece4Schristos * Frees and releases a readPool and its resources. Closes source file. */ 164*3117ece4Schristos void AIO_ReadPool_free(ReadPoolCtx_t* ctx); 165*3117ece4Schristos 166*3117ece4Schristos /* AIO_ReadPool_setAsync: 167*3117ece4Schristos * Allows (de)activating async mode, to be used when the expected overhead 168*3117ece4Schristos * of asyncio costs more than the expected gains. */ 169*3117ece4Schristos void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async); 170*3117ece4Schristos 171*3117ece4Schristos /* AIO_ReadPool_consumeBytes: 172*3117ece4Schristos * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ 173*3117ece4Schristos void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n); 174*3117ece4Schristos 175*3117ece4Schristos /* AIO_ReadPool_fillBuffer: 176*3117ece4Schristos * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize). 177*3117ece4Schristos * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file. 178*3117ece4Schristos * Return value is the number of bytes added to the buffer. 179*3117ece4Schristos * Note that srcBuffer might have up to 2 times bufferSize bytes. */ 180*3117ece4Schristos size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n); 181*3117ece4Schristos 182*3117ece4Schristos /* AIO_ReadPool_consumeAndRefill: 183*3117ece4Schristos * Consumes the current buffer and refills it with bufferSize bytes. */ 184*3117ece4Schristos size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx); 185*3117ece4Schristos 186*3117ece4Schristos /* AIO_ReadPool_setFile: 187*3117ece4Schristos * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. 188*3117ece4Schristos * Waits for all current enqueued tasks to complete if a previous file was set. */ 189*3117ece4Schristos void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file); 190*3117ece4Schristos 191*3117ece4Schristos /* AIO_ReadPool_getFile: 192*3117ece4Schristos * Returns the current file set for the read pool. */ 193*3117ece4Schristos FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx); 194*3117ece4Schristos 195*3117ece4Schristos /* AIO_ReadPool_closeFile: 196*3117ece4Schristos * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ 197*3117ece4Schristos int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx); 198*3117ece4Schristos 199*3117ece4Schristos #if defined (__cplusplus) 200*3117ece4Schristos } 201*3117ece4Schristos #endif 202*3117ece4Schristos 203*3117ece4Schristos #endif /* ZSTD_FILEIO_ASYNCIO_H */ 204