xref: /netbsd-src/external/bsd/zstd/dist/programs/fileio_asyncio.c (revision 3117ece4fc4a4ca4489ba793710b60b0d26bab6c)
1*3117ece4Schristos /*
2*3117ece4Schristos  * Copyright (c) Meta Platforms, Inc. and affiliates.
3*3117ece4Schristos  * All rights reserved.
4*3117ece4Schristos  *
5*3117ece4Schristos  * This source code is licensed under both the BSD-style license (found in the
6*3117ece4Schristos  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7*3117ece4Schristos  * in the COPYING file in the root directory of this source tree).
8*3117ece4Schristos  * You may select, at your option, one of the above-listed licenses.
9*3117ece4Schristos  */
10*3117ece4Schristos 
11*3117ece4Schristos #include "platform.h"
12*3117ece4Schristos #include <stdio.h>      /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
13*3117ece4Schristos #include <stdlib.h>     /* malloc, free */
14*3117ece4Schristos #include <assert.h>
15*3117ece4Schristos #include <errno.h>      /* errno */
16*3117ece4Schristos 
17*3117ece4Schristos #if defined (_MSC_VER)
18*3117ece4Schristos #  include <sys/stat.h>
19*3117ece4Schristos #  include <io.h>
20*3117ece4Schristos #endif
21*3117ece4Schristos 
22*3117ece4Schristos #include "fileio_asyncio.h"
23*3117ece4Schristos #include "fileio_common.h"
24*3117ece4Schristos 
25*3117ece4Schristos /* **********************************************************************
26*3117ece4Schristos  *  Sparse write
27*3117ece4Schristos  ************************************************************************/
28*3117ece4Schristos 
29*3117ece4Schristos /** AIO_fwriteSparse() :
30*3117ece4Schristos *  @return : storedSkips,
31*3117ece4Schristos *            argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
32*3117ece4Schristos static unsigned
33*3117ece4Schristos AIO_fwriteSparse(FILE* file,
34*3117ece4Schristos                  const void* buffer, size_t bufferSize,
35*3117ece4Schristos                  const FIO_prefs_t* const prefs,
36*3117ece4Schristos                  unsigned storedSkips)
37*3117ece4Schristos {
38*3117ece4Schristos     const size_t* const bufferT = (const size_t*)buffer;   /* Buffer is supposed malloc'ed, hence aligned on size_t */
39*3117ece4Schristos     size_t bufferSizeT = bufferSize / sizeof(size_t);
40*3117ece4Schristos     const size_t* const bufferTEnd = bufferT + bufferSizeT;
41*3117ece4Schristos     const size_t* ptrT = bufferT;
42*3117ece4Schristos     static const size_t segmentSizeT = (32 KB) / sizeof(size_t);   /* check every 32 KB */
43*3117ece4Schristos 
44*3117ece4Schristos     if (prefs->testMode) return 0;  /* do not output anything in test mode */
45*3117ece4Schristos 
46*3117ece4Schristos     if (!prefs->sparseFileSupport) {  /* normal write */
47*3117ece4Schristos         size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
48*3117ece4Schristos         if (sizeCheck != bufferSize)
49*3117ece4Schristos             EXM_THROW(70, "Write error : cannot write block : %s",
50*3117ece4Schristos                       strerror(errno));
51*3117ece4Schristos         return 0;
52*3117ece4Schristos     }
53*3117ece4Schristos 
54*3117ece4Schristos     /* avoid int overflow */
55*3117ece4Schristos     if (storedSkips > 1 GB) {
56*3117ece4Schristos         if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
57*3117ece4Schristos         EXM_THROW(91, "1 GB skip error (sparse file support)");
58*3117ece4Schristos         storedSkips -= 1 GB;
59*3117ece4Schristos     }
60*3117ece4Schristos 
61*3117ece4Schristos     while (ptrT < bufferTEnd) {
62*3117ece4Schristos         size_t nb0T;
63*3117ece4Schristos 
64*3117ece4Schristos         /* adjust last segment if < 32 KB */
65*3117ece4Schristos         size_t seg0SizeT = segmentSizeT;
66*3117ece4Schristos         if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
67*3117ece4Schristos         bufferSizeT -= seg0SizeT;
68*3117ece4Schristos 
69*3117ece4Schristos         /* count leading zeroes */
70*3117ece4Schristos         for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
71*3117ece4Schristos         storedSkips += (unsigned)(nb0T * sizeof(size_t));
72*3117ece4Schristos 
73*3117ece4Schristos         if (nb0T != seg0SizeT) {   /* not all 0s */
74*3117ece4Schristos             size_t const nbNon0ST = seg0SizeT - nb0T;
75*3117ece4Schristos             /* skip leading zeros */
76*3117ece4Schristos             if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
77*3117ece4Schristos                 EXM_THROW(92, "Sparse skip error ; try --no-sparse");
78*3117ece4Schristos             storedSkips = 0;
79*3117ece4Schristos             /* write the rest */
80*3117ece4Schristos             if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
81*3117ece4Schristos                 EXM_THROW(93, "Write error : cannot write block : %s",
82*3117ece4Schristos                           strerror(errno));
83*3117ece4Schristos         }
84*3117ece4Schristos         ptrT += seg0SizeT;
85*3117ece4Schristos     }
86*3117ece4Schristos 
87*3117ece4Schristos     {   static size_t const maskT = sizeof(size_t)-1;
88*3117ece4Schristos         if (bufferSize & maskT) {
89*3117ece4Schristos             /* size not multiple of sizeof(size_t) : implies end of block */
90*3117ece4Schristos             const char* const restStart = (const char*)bufferTEnd;
91*3117ece4Schristos             const char* restPtr = restStart;
92*3117ece4Schristos             const char* const restEnd = (const char*)buffer + bufferSize;
93*3117ece4Schristos             assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
94*3117ece4Schristos             for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
95*3117ece4Schristos             storedSkips += (unsigned) (restPtr - restStart);
96*3117ece4Schristos             if (restPtr != restEnd) {
97*3117ece4Schristos                 /* not all remaining bytes are 0 */
98*3117ece4Schristos                 size_t const restSize = (size_t)(restEnd - restPtr);
99*3117ece4Schristos                 if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
100*3117ece4Schristos                     EXM_THROW(92, "Sparse skip error ; try --no-sparse");
101*3117ece4Schristos                 if (fwrite(restPtr, 1, restSize, file) != restSize)
102*3117ece4Schristos                     EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
103*3117ece4Schristos                               strerror(errno));
104*3117ece4Schristos                 storedSkips = 0;
105*3117ece4Schristos             }   }   }
106*3117ece4Schristos 
107*3117ece4Schristos     return storedSkips;
108*3117ece4Schristos }
109*3117ece4Schristos 
110*3117ece4Schristos static void
111*3117ece4Schristos AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
112*3117ece4Schristos {
113*3117ece4Schristos     if (prefs->testMode) assert(storedSkips == 0);
114*3117ece4Schristos     if (storedSkips>0) {
115*3117ece4Schristos         assert(prefs->sparseFileSupport > 0);  /* storedSkips>0 implies sparse support is enabled */
116*3117ece4Schristos         (void)prefs;   /* assert can be disabled, in which case prefs becomes unused */
117*3117ece4Schristos         if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
118*3117ece4Schristos             EXM_THROW(69, "Final skip error (sparse file support)");
119*3117ece4Schristos         /* last zero must be explicitly written,
120*3117ece4Schristos          * so that skipped ones get implicitly translated as zero by FS */
121*3117ece4Schristos         {   const char lastZeroByte[1] = { 0 };
122*3117ece4Schristos             if (fwrite(lastZeroByte, 1, 1, file) != 1)
123*3117ece4Schristos                 EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
124*3117ece4Schristos         }   }
125*3117ece4Schristos }
126*3117ece4Schristos 
127*3117ece4Schristos 
128*3117ece4Schristos /* **********************************************************************
129*3117ece4Schristos  *  AsyncIO functionality
130*3117ece4Schristos  ************************************************************************/
131*3117ece4Schristos 
132*3117ece4Schristos /* AIO_supported:
133*3117ece4Schristos  * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
134*3117ece4Schristos int AIO_supported(void) {
135*3117ece4Schristos #ifdef ZSTD_MULTITHREAD
136*3117ece4Schristos     return 1;
137*3117ece4Schristos #else
138*3117ece4Schristos     return 0;
139*3117ece4Schristos #endif
140*3117ece4Schristos }
141*3117ece4Schristos 
142*3117ece4Schristos /* ***********************************
143*3117ece4Schristos  *  Generic IoPool implementation
144*3117ece4Schristos  *************************************/
145*3117ece4Schristos 
146*3117ece4Schristos static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
147*3117ece4Schristos     IOJob_t* const job  = (IOJob_t*) malloc(sizeof(IOJob_t));
148*3117ece4Schristos     void* const buffer = malloc(bufferSize);
149*3117ece4Schristos     if(!job || !buffer)
150*3117ece4Schristos         EXM_THROW(101, "Allocation error : not enough memory");
151*3117ece4Schristos     job->buffer = buffer;
152*3117ece4Schristos     job->bufferSize = bufferSize;
153*3117ece4Schristos     job->usedBufferSize = 0;
154*3117ece4Schristos     job->file = NULL;
155*3117ece4Schristos     job->ctx = ctx;
156*3117ece4Schristos     job->offset = 0;
157*3117ece4Schristos     return job;
158*3117ece4Schristos }
159*3117ece4Schristos 
160*3117ece4Schristos 
161*3117ece4Schristos /* AIO_IOPool_createThreadPool:
162*3117ece4Schristos  * Creates a thread pool and a mutex for threaded IO pool.
163*3117ece4Schristos  * Displays warning if asyncio is requested but MT isn't available. */
164*3117ece4Schristos static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
165*3117ece4Schristos     ctx->threadPool = NULL;
166*3117ece4Schristos     ctx->threadPoolActive = 0;
167*3117ece4Schristos     if(prefs->asyncIO) {
168*3117ece4Schristos         if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
169*3117ece4Schristos             EXM_THROW(102,"Failed creating ioJobsMutex mutex");
170*3117ece4Schristos         /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
171*3117ece4Schristos          * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
172*3117ece4Schristos         assert(MAX_IO_JOBS >= 2);
173*3117ece4Schristos         ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
174*3117ece4Schristos         ctx->threadPoolActive = 1;
175*3117ece4Schristos         if (!ctx->threadPool)
176*3117ece4Schristos             EXM_THROW(104, "Failed creating I/O thread pool");
177*3117ece4Schristos     }
178*3117ece4Schristos }
179*3117ece4Schristos 
180*3117ece4Schristos /* AIO_IOPool_init:
181*3117ece4Schristos  * Allocates and sets and a new I/O thread pool including its included availableJobs. */
182*3117ece4Schristos static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
183*3117ece4Schristos     int i;
184*3117ece4Schristos     AIO_IOPool_createThreadPool(ctx, prefs);
185*3117ece4Schristos     ctx->prefs = prefs;
186*3117ece4Schristos     ctx->poolFunction = poolFunction;
187*3117ece4Schristos     ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
188*3117ece4Schristos     ctx->availableJobsCount = ctx->totalIoJobs;
189*3117ece4Schristos     for(i=0; i < ctx->availableJobsCount; i++) {
190*3117ece4Schristos         ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
191*3117ece4Schristos     }
192*3117ece4Schristos     ctx->jobBufferSize = bufferSize;
193*3117ece4Schristos     ctx->file = NULL;
194*3117ece4Schristos }
195*3117ece4Schristos 
196*3117ece4Schristos 
197*3117ece4Schristos /* AIO_IOPool_threadPoolActive:
198*3117ece4Schristos  * Check if current operation uses thread pool.
199*3117ece4Schristos  * Note that in some cases we have a thread pool initialized but choose not to use it. */
200*3117ece4Schristos static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
201*3117ece4Schristos     return ctx->threadPool && ctx->threadPoolActive;
202*3117ece4Schristos }
203*3117ece4Schristos 
204*3117ece4Schristos 
205*3117ece4Schristos /* AIO_IOPool_lockJobsMutex:
206*3117ece4Schristos  * Locks the IO jobs mutex if threading is active */
207*3117ece4Schristos static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
208*3117ece4Schristos     if(AIO_IOPool_threadPoolActive(ctx))
209*3117ece4Schristos         ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
210*3117ece4Schristos }
211*3117ece4Schristos 
212*3117ece4Schristos /* AIO_IOPool_unlockJobsMutex:
213*3117ece4Schristos  * Unlocks the IO jobs mutex if threading is active */
214*3117ece4Schristos static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
215*3117ece4Schristos     if(AIO_IOPool_threadPoolActive(ctx))
216*3117ece4Schristos         ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
217*3117ece4Schristos }
218*3117ece4Schristos 
219*3117ece4Schristos /* AIO_IOPool_releaseIoJob:
220*3117ece4Schristos  * Releases an acquired job back to the pool. Doesn't execute the job. */
221*3117ece4Schristos static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
222*3117ece4Schristos     IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
223*3117ece4Schristos     AIO_IOPool_lockJobsMutex(ctx);
224*3117ece4Schristos     assert(ctx->availableJobsCount < ctx->totalIoJobs);
225*3117ece4Schristos     ctx->availableJobs[ctx->availableJobsCount++] = job;
226*3117ece4Schristos     AIO_IOPool_unlockJobsMutex(ctx);
227*3117ece4Schristos }
228*3117ece4Schristos 
229*3117ece4Schristos /* AIO_IOPool_join:
230*3117ece4Schristos  * Waits for all tasks in the pool to finish executing. */
231*3117ece4Schristos static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
232*3117ece4Schristos     if(AIO_IOPool_threadPoolActive(ctx))
233*3117ece4Schristos         POOL_joinJobs(ctx->threadPool);
234*3117ece4Schristos }
235*3117ece4Schristos 
236*3117ece4Schristos /* AIO_IOPool_setThreaded:
237*3117ece4Schristos  * Allows (de)activating threaded mode, to be used when the expected overhead
238*3117ece4Schristos  * of threading costs more than the expected gains. */
239*3117ece4Schristos static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
240*3117ece4Schristos     assert(threaded == 0 || threaded == 1);
241*3117ece4Schristos     assert(ctx != NULL);
242*3117ece4Schristos     if(ctx->threadPoolActive != threaded) {
243*3117ece4Schristos         AIO_IOPool_join(ctx);
244*3117ece4Schristos         ctx->threadPoolActive = threaded;
245*3117ece4Schristos     }
246*3117ece4Schristos }
247*3117ece4Schristos 
248*3117ece4Schristos /* AIO_IOPool_free:
249*3117ece4Schristos  * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
250*3117ece4Schristos static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
251*3117ece4Schristos     int i;
252*3117ece4Schristos     if(ctx->threadPool) {
253*3117ece4Schristos         /* Make sure we finish all tasks and then free the resources */
254*3117ece4Schristos         AIO_IOPool_join(ctx);
255*3117ece4Schristos         /* Make sure we are not leaking availableJobs */
256*3117ece4Schristos         assert(ctx->availableJobsCount == ctx->totalIoJobs);
257*3117ece4Schristos         POOL_free(ctx->threadPool);
258*3117ece4Schristos         ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
259*3117ece4Schristos     }
260*3117ece4Schristos     assert(ctx->file == NULL);
261*3117ece4Schristos     for(i=0; i<ctx->availableJobsCount; i++) {
262*3117ece4Schristos         IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
263*3117ece4Schristos         free(job->buffer);
264*3117ece4Schristos         free(job);
265*3117ece4Schristos     }
266*3117ece4Schristos }
267*3117ece4Schristos 
268*3117ece4Schristos /* AIO_IOPool_acquireJob:
269*3117ece4Schristos  * Returns an available io job to be used for a future io. */
270*3117ece4Schristos static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
271*3117ece4Schristos     IOJob_t *job;
272*3117ece4Schristos     assert(ctx->file != NULL || ctx->prefs->testMode);
273*3117ece4Schristos     AIO_IOPool_lockJobsMutex(ctx);
274*3117ece4Schristos     assert(ctx->availableJobsCount > 0);
275*3117ece4Schristos     job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
276*3117ece4Schristos     AIO_IOPool_unlockJobsMutex(ctx);
277*3117ece4Schristos     job->usedBufferSize = 0;
278*3117ece4Schristos     job->file = ctx->file;
279*3117ece4Schristos     job->offset = 0;
280*3117ece4Schristos     return job;
281*3117ece4Schristos }
282*3117ece4Schristos 
283*3117ece4Schristos 
284*3117ece4Schristos /* AIO_IOPool_setFile:
285*3117ece4Schristos  * Sets the destination file for future files in the pool.
286*3117ece4Schristos  * Requires completion of all queued jobs and release of all otherwise acquired jobs. */
287*3117ece4Schristos static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
288*3117ece4Schristos     assert(ctx!=NULL);
289*3117ece4Schristos     AIO_IOPool_join(ctx);
290*3117ece4Schristos     assert(ctx->availableJobsCount == ctx->totalIoJobs);
291*3117ece4Schristos     ctx->file = file;
292*3117ece4Schristos }
293*3117ece4Schristos 
294*3117ece4Schristos static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
295*3117ece4Schristos     return ctx->file;
296*3117ece4Schristos }
297*3117ece4Schristos 
298*3117ece4Schristos /* AIO_IOPool_enqueueJob:
299*3117ece4Schristos  * Enqueues an io job for execution.
300*3117ece4Schristos  * The queued job shouldn't be used directly after queueing it. */
301*3117ece4Schristos static void AIO_IOPool_enqueueJob(IOJob_t* job) {
302*3117ece4Schristos     IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
303*3117ece4Schristos     if(AIO_IOPool_threadPoolActive(ctx))
304*3117ece4Schristos         POOL_add(ctx->threadPool, ctx->poolFunction, job);
305*3117ece4Schristos     else
306*3117ece4Schristos         ctx->poolFunction(job);
307*3117ece4Schristos }
308*3117ece4Schristos 
309*3117ece4Schristos /* ***********************************
310*3117ece4Schristos  *  WritePool implementation
311*3117ece4Schristos  *************************************/
312*3117ece4Schristos 
313*3117ece4Schristos /* AIO_WritePool_acquireJob:
314*3117ece4Schristos  * Returns an available write job to be used for a future write. */
315*3117ece4Schristos IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
316*3117ece4Schristos     return AIO_IOPool_acquireJob(&ctx->base);
317*3117ece4Schristos }
318*3117ece4Schristos 
319*3117ece4Schristos /* AIO_WritePool_enqueueAndReacquireWriteJob:
320*3117ece4Schristos  * Queues a write job for execution and acquires a new one.
321*3117ece4Schristos  * After execution `job`'s pointed value would change to the newly acquired job.
322*3117ece4Schristos  * Make sure to set `usedBufferSize` to the wanted length before call.
323*3117ece4Schristos  * The queued job shouldn't be used directly after queueing it. */
324*3117ece4Schristos void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
325*3117ece4Schristos     AIO_IOPool_enqueueJob(*job);
326*3117ece4Schristos     *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
327*3117ece4Schristos }
328*3117ece4Schristos 
329*3117ece4Schristos /* AIO_WritePool_sparseWriteEnd:
330*3117ece4Schristos  * Ends sparse writes to the current file.
331*3117ece4Schristos  * Blocks on completion of all current write jobs before executing. */
332*3117ece4Schristos void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
333*3117ece4Schristos     assert(ctx != NULL);
334*3117ece4Schristos     AIO_IOPool_join(&ctx->base);
335*3117ece4Schristos     AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
336*3117ece4Schristos     ctx->storedSkips = 0;
337*3117ece4Schristos }
338*3117ece4Schristos 
339*3117ece4Schristos /* AIO_WritePool_setFile:
340*3117ece4Schristos  * Sets the destination file for future writes in the pool.
341*3117ece4Schristos  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
342*3117ece4Schristos  * Also requires ending of sparse write if a previous file was used in sparse mode. */
343*3117ece4Schristos void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
344*3117ece4Schristos     AIO_IOPool_setFile(&ctx->base, file);
345*3117ece4Schristos     assert(ctx->storedSkips == 0);
346*3117ece4Schristos }
347*3117ece4Schristos 
348*3117ece4Schristos /* AIO_WritePool_getFile:
349*3117ece4Schristos  * Returns the file the writePool is currently set to write to. */
350*3117ece4Schristos FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
351*3117ece4Schristos     return AIO_IOPool_getFile(&ctx->base);
352*3117ece4Schristos }
353*3117ece4Schristos 
354*3117ece4Schristos /* AIO_WritePool_releaseIoJob:
355*3117ece4Schristos  * Releases an acquired job back to the pool. Doesn't execute the job. */
356*3117ece4Schristos void AIO_WritePool_releaseIoJob(IOJob_t* job) {
357*3117ece4Schristos     AIO_IOPool_releaseIoJob(job);
358*3117ece4Schristos }
359*3117ece4Schristos 
360*3117ece4Schristos /* AIO_WritePool_closeFile:
361*3117ece4Schristos  * Ends sparse write and closes the writePool's current file and sets the file to NULL.
362*3117ece4Schristos  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
363*3117ece4Schristos int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
364*3117ece4Schristos     FILE* const dstFile = ctx->base.file;
365*3117ece4Schristos     assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
366*3117ece4Schristos     AIO_WritePool_sparseWriteEnd(ctx);
367*3117ece4Schristos     AIO_IOPool_setFile(&ctx->base, NULL);
368*3117ece4Schristos     return fclose(dstFile);
369*3117ece4Schristos }
370*3117ece4Schristos 
371*3117ece4Schristos /* AIO_WritePool_executeWriteJob:
372*3117ece4Schristos  * Executes a write job synchronously. Can be used as a function for a thread pool. */
373*3117ece4Schristos static void AIO_WritePool_executeWriteJob(void* opaque){
374*3117ece4Schristos     IOJob_t* const job = (IOJob_t*) opaque;
375*3117ece4Schristos     WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
376*3117ece4Schristos     ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
377*3117ece4Schristos     AIO_IOPool_releaseIoJob(job);
378*3117ece4Schristos }
379*3117ece4Schristos 
380*3117ece4Schristos /* AIO_WritePool_create:
381*3117ece4Schristos  * Allocates and sets and a new write pool including its included jobs. */
382*3117ece4Schristos WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
383*3117ece4Schristos     WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
384*3117ece4Schristos     if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
385*3117ece4Schristos     AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
386*3117ece4Schristos     ctx->storedSkips = 0;
387*3117ece4Schristos     return ctx;
388*3117ece4Schristos }
389*3117ece4Schristos 
390*3117ece4Schristos /* AIO_WritePool_free:
391*3117ece4Schristos  * Frees and releases a writePool and its resources. Closes destination file if needs to. */
392*3117ece4Schristos void AIO_WritePool_free(WritePoolCtx_t* ctx) {
393*3117ece4Schristos     /* Make sure we finish all tasks and then free the resources */
394*3117ece4Schristos     if(AIO_WritePool_getFile(ctx))
395*3117ece4Schristos         AIO_WritePool_closeFile(ctx);
396*3117ece4Schristos     AIO_IOPool_destroy(&ctx->base);
397*3117ece4Schristos     assert(ctx->storedSkips==0);
398*3117ece4Schristos     free(ctx);
399*3117ece4Schristos }
400*3117ece4Schristos 
401*3117ece4Schristos /* AIO_WritePool_setAsync:
402*3117ece4Schristos  * Allows (de)activating async mode, to be used when the expected overhead
403*3117ece4Schristos  * of asyncio costs more than the expected gains. */
404*3117ece4Schristos void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
405*3117ece4Schristos     AIO_IOPool_setThreaded(&ctx->base, async);
406*3117ece4Schristos }
407*3117ece4Schristos 
408*3117ece4Schristos 
409*3117ece4Schristos /* ***********************************
410*3117ece4Schristos  *  ReadPool implementation
411*3117ece4Schristos  *************************************/
412*3117ece4Schristos static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
413*3117ece4Schristos     int i;
414*3117ece4Schristos     for(i=0; i<ctx->completedJobsCount; i++) {
415*3117ece4Schristos         IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
416*3117ece4Schristos         AIO_IOPool_releaseIoJob(job);
417*3117ece4Schristos     }
418*3117ece4Schristos     ctx->completedJobsCount = 0;
419*3117ece4Schristos }
420*3117ece4Schristos 
421*3117ece4Schristos static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
422*3117ece4Schristos     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
423*3117ece4Schristos     AIO_IOPool_lockJobsMutex(&ctx->base);
424*3117ece4Schristos     assert(ctx->completedJobsCount < MAX_IO_JOBS);
425*3117ece4Schristos     ctx->completedJobs[ctx->completedJobsCount++] = job;
426*3117ece4Schristos     if(AIO_IOPool_threadPoolActive(&ctx->base)) {
427*3117ece4Schristos         ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
428*3117ece4Schristos     }
429*3117ece4Schristos     AIO_IOPool_unlockJobsMutex(&ctx->base);
430*3117ece4Schristos }
431*3117ece4Schristos 
432*3117ece4Schristos /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
433*3117ece4Schristos  * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
434*3117ece4Schristos  * if job wasn't found returns NULL.
435*3117ece4Schristos  * IMPORTANT: assumes ioJobsMutex is locked. */
436*3117ece4Schristos static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
437*3117ece4Schristos     IOJob_t *job = NULL;
438*3117ece4Schristos     int i;
439*3117ece4Schristos     /* This implementation goes through all completed jobs and looks for the one matching the next offset.
440*3117ece4Schristos      * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
441*3117ece4Schristos      * reads to be completed in order) this implementation was chosen as it better fits other asyncio
442*3117ece4Schristos      * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
443*3117ece4Schristos     for (i=0; i<ctx->completedJobsCount; i++) {
444*3117ece4Schristos         job = (IOJob_t *) ctx->completedJobs[i];
445*3117ece4Schristos         if (job->offset == ctx->waitingOnOffset) {
446*3117ece4Schristos             ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
447*3117ece4Schristos             return job;
448*3117ece4Schristos         }
449*3117ece4Schristos     }
450*3117ece4Schristos     return NULL;
451*3117ece4Schristos }
452*3117ece4Schristos 
453*3117ece4Schristos /* AIO_ReadPool_numReadsInFlight:
454*3117ece4Schristos  * Returns the number of IO read jobs currently in flight. */
455*3117ece4Schristos static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
456*3117ece4Schristos     const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
457*3117ece4Schristos     return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld));
458*3117ece4Schristos }
459*3117ece4Schristos 
460*3117ece4Schristos /* AIO_ReadPool_getNextCompletedJob:
461*3117ece4Schristos  * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
462*3117ece4Schristos  * Would block. */
463*3117ece4Schristos static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
464*3117ece4Schristos     IOJob_t *job = NULL;
465*3117ece4Schristos     AIO_IOPool_lockJobsMutex(&ctx->base);
466*3117ece4Schristos 
467*3117ece4Schristos     job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
468*3117ece4Schristos 
469*3117ece4Schristos     /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
470*3117ece4Schristos     while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
471*3117ece4Schristos         assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
472*3117ece4Schristos         ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
473*3117ece4Schristos         job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
474*3117ece4Schristos     }
475*3117ece4Schristos 
476*3117ece4Schristos     if(job) {
477*3117ece4Schristos         assert(job->offset == ctx->waitingOnOffset);
478*3117ece4Schristos         ctx->waitingOnOffset += job->usedBufferSize;
479*3117ece4Schristos     }
480*3117ece4Schristos 
481*3117ece4Schristos     AIO_IOPool_unlockJobsMutex(&ctx->base);
482*3117ece4Schristos     return job;
483*3117ece4Schristos }
484*3117ece4Schristos 
485*3117ece4Schristos 
486*3117ece4Schristos /* AIO_ReadPool_executeReadJob:
487*3117ece4Schristos  * Executes a read job synchronously. Can be used as a function for a thread pool. */
488*3117ece4Schristos static void AIO_ReadPool_executeReadJob(void* opaque){
489*3117ece4Schristos     IOJob_t* const job = (IOJob_t*) opaque;
490*3117ece4Schristos     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
491*3117ece4Schristos     if(ctx->reachedEof) {
492*3117ece4Schristos         job->usedBufferSize = 0;
493*3117ece4Schristos         AIO_ReadPool_addJobToCompleted(job);
494*3117ece4Schristos         return;
495*3117ece4Schristos     }
496*3117ece4Schristos     job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
497*3117ece4Schristos     if(job->usedBufferSize < job->bufferSize) {
498*3117ece4Schristos         if(ferror(job->file)) {
499*3117ece4Schristos             EXM_THROW(37, "Read error");
500*3117ece4Schristos         } else if(feof(job->file)) {
501*3117ece4Schristos             ctx->reachedEof = 1;
502*3117ece4Schristos         } else {
503*3117ece4Schristos             EXM_THROW(37, "Unexpected short read");
504*3117ece4Schristos         }
505*3117ece4Schristos     }
506*3117ece4Schristos     AIO_ReadPool_addJobToCompleted(job);
507*3117ece4Schristos }
508*3117ece4Schristos 
509*3117ece4Schristos static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
510*3117ece4Schristos     IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
511*3117ece4Schristos     job->offset = ctx->nextReadOffset;
512*3117ece4Schristos     ctx->nextReadOffset += job->bufferSize;
513*3117ece4Schristos     AIO_IOPool_enqueueJob(job);
514*3117ece4Schristos }
515*3117ece4Schristos 
516*3117ece4Schristos static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
517*3117ece4Schristos     while(ctx->base.availableJobsCount) {
518*3117ece4Schristos         AIO_ReadPool_enqueueRead(ctx);
519*3117ece4Schristos     }
520*3117ece4Schristos }
521*3117ece4Schristos 
522*3117ece4Schristos /* AIO_ReadPool_setFile:
523*3117ece4Schristos  * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
524*3117ece4Schristos  * Waits for all current enqueued tasks to complete if a previous file was set. */
525*3117ece4Schristos void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
526*3117ece4Schristos     assert(ctx!=NULL);
527*3117ece4Schristos     AIO_IOPool_join(&ctx->base);
528*3117ece4Schristos     AIO_ReadPool_releaseAllCompletedJobs(ctx);
529*3117ece4Schristos     if (ctx->currentJobHeld) {
530*3117ece4Schristos         AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
531*3117ece4Schristos         ctx->currentJobHeld = NULL;
532*3117ece4Schristos     }
533*3117ece4Schristos     AIO_IOPool_setFile(&ctx->base, file);
534*3117ece4Schristos     ctx->nextReadOffset = 0;
535*3117ece4Schristos     ctx->waitingOnOffset = 0;
536*3117ece4Schristos     ctx->srcBuffer = ctx->coalesceBuffer;
537*3117ece4Schristos     ctx->srcBufferLoaded = 0;
538*3117ece4Schristos     ctx->reachedEof = 0;
539*3117ece4Schristos     if(file != NULL)
540*3117ece4Schristos         AIO_ReadPool_startReading(ctx);
541*3117ece4Schristos }
542*3117ece4Schristos 
543*3117ece4Schristos /* AIO_ReadPool_create:
544*3117ece4Schristos  * Allocates and sets and a new readPool including its included jobs.
545*3117ece4Schristos  * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
546*3117ece4Schristos  * as our basic read size. */
547*3117ece4Schristos ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
548*3117ece4Schristos     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
549*3117ece4Schristos     if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
550*3117ece4Schristos     AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
551*3117ece4Schristos 
552*3117ece4Schristos     ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
553*3117ece4Schristos     if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory");
554*3117ece4Schristos     ctx->srcBuffer = ctx->coalesceBuffer;
555*3117ece4Schristos     ctx->srcBufferLoaded = 0;
556*3117ece4Schristos     ctx->completedJobsCount = 0;
557*3117ece4Schristos     ctx->currentJobHeld = NULL;
558*3117ece4Schristos 
559*3117ece4Schristos     if(ctx->base.threadPool)
560*3117ece4Schristos         if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
561*3117ece4Schristos             EXM_THROW(103,"Failed creating jobCompletedCond cond");
562*3117ece4Schristos 
563*3117ece4Schristos     return ctx;
564*3117ece4Schristos }
565*3117ece4Schristos 
566*3117ece4Schristos /* AIO_ReadPool_free:
567*3117ece4Schristos  * Frees and releases a readPool and its resources. Closes source file. */
568*3117ece4Schristos void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
569*3117ece4Schristos     if(AIO_ReadPool_getFile(ctx))
570*3117ece4Schristos         AIO_ReadPool_closeFile(ctx);
571*3117ece4Schristos     if(ctx->base.threadPool)
572*3117ece4Schristos         ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
573*3117ece4Schristos     AIO_IOPool_destroy(&ctx->base);
574*3117ece4Schristos     free(ctx->coalesceBuffer);
575*3117ece4Schristos     free(ctx);
576*3117ece4Schristos }
577*3117ece4Schristos 
578*3117ece4Schristos /* AIO_ReadPool_consumeBytes:
579*3117ece4Schristos  * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
580*3117ece4Schristos void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
581*3117ece4Schristos     assert(n <= ctx->srcBufferLoaded);
582*3117ece4Schristos     ctx->srcBufferLoaded -= n;
583*3117ece4Schristos     ctx->srcBuffer += n;
584*3117ece4Schristos }
585*3117ece4Schristos 
586*3117ece4Schristos /* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
587*3117ece4Schristos  * Release the current held job and get the next one, returns NULL if no next job available. */
588*3117ece4Schristos static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
589*3117ece4Schristos     if (ctx->currentJobHeld) {
590*3117ece4Schristos         AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
591*3117ece4Schristos         ctx->currentJobHeld = NULL;
592*3117ece4Schristos         AIO_ReadPool_enqueueRead(ctx);
593*3117ece4Schristos     }
594*3117ece4Schristos     ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
595*3117ece4Schristos     return (IOJob_t*) ctx->currentJobHeld;
596*3117ece4Schristos }
597*3117ece4Schristos 
598*3117ece4Schristos /* AIO_ReadPool_fillBuffer:
599*3117ece4Schristos  * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
600*3117ece4Schristos  * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
601*3117ece4Schristos  * Return value is the number of bytes added to the buffer.
602*3117ece4Schristos  * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
603*3117ece4Schristos size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
604*3117ece4Schristos     IOJob_t *job;
605*3117ece4Schristos     int useCoalesce = 0;
606*3117ece4Schristos     if(n > ctx->base.jobBufferSize)
607*3117ece4Schristos         n = ctx->base.jobBufferSize;
608*3117ece4Schristos 
609*3117ece4Schristos     /* We are good, don't read anything */
610*3117ece4Schristos     if (ctx->srcBufferLoaded >= n)
611*3117ece4Schristos         return 0;
612*3117ece4Schristos 
613*3117ece4Schristos     /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
614*3117ece4Schristos      * and coalesce the remaining bytes with the next job's buffer */
615*3117ece4Schristos     if (ctx->srcBufferLoaded > 0) {
616*3117ece4Schristos         useCoalesce = 1;
617*3117ece4Schristos         memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
618*3117ece4Schristos         ctx->srcBuffer = ctx->coalesceBuffer;
619*3117ece4Schristos     }
620*3117ece4Schristos 
621*3117ece4Schristos     /* Read the next chunk */
622*3117ece4Schristos     job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
623*3117ece4Schristos     if(!job)
624*3117ece4Schristos         return 0;
625*3117ece4Schristos     if(useCoalesce) {
626*3117ece4Schristos         assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
627*3117ece4Schristos         memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
628*3117ece4Schristos         ctx->srcBufferLoaded += job->usedBufferSize;
629*3117ece4Schristos     }
630*3117ece4Schristos     else {
631*3117ece4Schristos         ctx->srcBuffer = (U8 *) job->buffer;
632*3117ece4Schristos         ctx->srcBufferLoaded = job->usedBufferSize;
633*3117ece4Schristos     }
634*3117ece4Schristos     return job->usedBufferSize;
635*3117ece4Schristos }
636*3117ece4Schristos 
637*3117ece4Schristos /* AIO_ReadPool_consumeAndRefill:
638*3117ece4Schristos  * Consumes the current buffer and refills it with bufferSize bytes. */
639*3117ece4Schristos size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
640*3117ece4Schristos     AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
641*3117ece4Schristos     return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
642*3117ece4Schristos }
643*3117ece4Schristos 
644*3117ece4Schristos /* AIO_ReadPool_getFile:
645*3117ece4Schristos  * Returns the current file set for the read pool. */
646*3117ece4Schristos FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
647*3117ece4Schristos     return AIO_IOPool_getFile(&ctx->base);
648*3117ece4Schristos }
649*3117ece4Schristos 
650*3117ece4Schristos /* AIO_ReadPool_closeFile:
651*3117ece4Schristos  * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
652*3117ece4Schristos int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
653*3117ece4Schristos     FILE* const file = AIO_ReadPool_getFile(ctx);
654*3117ece4Schristos     AIO_ReadPool_setFile(ctx, NULL);
655*3117ece4Schristos     return fclose(file);
656*3117ece4Schristos }
657*3117ece4Schristos 
658*3117ece4Schristos /* AIO_ReadPool_setAsync:
659*3117ece4Schristos  * Allows (de)activating async mode, to be used when the expected overhead
660*3117ece4Schristos  * of asyncio costs more than the expected gains. */
661*3117ece4Schristos void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
662*3117ece4Schristos     AIO_IOPool_setThreaded(&ctx->base, async);
663*3117ece4Schristos }
664