xref: /netbsd-src/external/bsd/zstd/dist/lib/common/pool.c (revision 3117ece4fc4a4ca4489ba793710b60b0d26bab6c)
1*3117ece4Schristos /*
2*3117ece4Schristos  * Copyright (c) Meta Platforms, Inc. and affiliates.
3*3117ece4Schristos  * All rights reserved.
4*3117ece4Schristos  *
5*3117ece4Schristos  * This source code is licensed under both the BSD-style license (found in the
6*3117ece4Schristos  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7*3117ece4Schristos  * in the COPYING file in the root directory of this source tree).
8*3117ece4Schristos  * You may select, at your option, one of the above-listed licenses.
9*3117ece4Schristos  */
10*3117ece4Schristos 
11*3117ece4Schristos 
12*3117ece4Schristos /* ======   Dependencies   ======= */
13*3117ece4Schristos #include "../common/allocations.h"  /* ZSTD_customCalloc, ZSTD_customFree */
14*3117ece4Schristos #include "zstd_deps.h" /* size_t */
15*3117ece4Schristos #include "debug.h"     /* assert */
16*3117ece4Schristos #include "pool.h"
17*3117ece4Schristos 
18*3117ece4Schristos /* ======   Compiler specifics   ====== */
19*3117ece4Schristos #if defined(_MSC_VER)
20*3117ece4Schristos #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
21*3117ece4Schristos #endif
22*3117ece4Schristos 
23*3117ece4Schristos 
24*3117ece4Schristos #ifdef ZSTD_MULTITHREAD
25*3117ece4Schristos 
26*3117ece4Schristos #include "threading.h"   /* pthread adaptation */
27*3117ece4Schristos 
28*3117ece4Schristos /* A job is a function and an opaque argument */
29*3117ece4Schristos typedef struct POOL_job_s {
30*3117ece4Schristos     POOL_function function;
31*3117ece4Schristos     void *opaque;
32*3117ece4Schristos } POOL_job;
33*3117ece4Schristos 
34*3117ece4Schristos struct POOL_ctx_s {
35*3117ece4Schristos     ZSTD_customMem customMem;
36*3117ece4Schristos     /* Keep track of the threads */
37*3117ece4Schristos     ZSTD_pthread_t* threads;
38*3117ece4Schristos     size_t threadCapacity;
39*3117ece4Schristos     size_t threadLimit;
40*3117ece4Schristos 
41*3117ece4Schristos     /* The queue is a circular buffer */
42*3117ece4Schristos     POOL_job *queue;
43*3117ece4Schristos     size_t queueHead;
44*3117ece4Schristos     size_t queueTail;
45*3117ece4Schristos     size_t queueSize;
46*3117ece4Schristos 
47*3117ece4Schristos     /* The number of threads working on jobs */
48*3117ece4Schristos     size_t numThreadsBusy;
49*3117ece4Schristos     /* Indicates if the queue is empty */
50*3117ece4Schristos     int queueEmpty;
51*3117ece4Schristos 
52*3117ece4Schristos     /* The mutex protects the queue */
53*3117ece4Schristos     ZSTD_pthread_mutex_t queueMutex;
54*3117ece4Schristos     /* Condition variable for pushers to wait on when the queue is full */
55*3117ece4Schristos     ZSTD_pthread_cond_t queuePushCond;
56*3117ece4Schristos     /* Condition variables for poppers to wait on when the queue is empty */
57*3117ece4Schristos     ZSTD_pthread_cond_t queuePopCond;
58*3117ece4Schristos     /* Indicates if the queue is shutting down */
59*3117ece4Schristos     int shutdown;
60*3117ece4Schristos };
61*3117ece4Schristos 
62*3117ece4Schristos /* POOL_thread() :
63*3117ece4Schristos  * Work thread for the thread pool.
64*3117ece4Schristos  * Waits for jobs and executes them.
65*3117ece4Schristos  * @returns : NULL on failure else non-null.
66*3117ece4Schristos  */
67*3117ece4Schristos static void* POOL_thread(void* opaque) {
68*3117ece4Schristos     POOL_ctx* const ctx = (POOL_ctx*)opaque;
69*3117ece4Schristos     if (!ctx) { return NULL; }
70*3117ece4Schristos     for (;;) {
71*3117ece4Schristos         /* Lock the mutex and wait for a non-empty queue or until shutdown */
72*3117ece4Schristos         ZSTD_pthread_mutex_lock(&ctx->queueMutex);
73*3117ece4Schristos 
74*3117ece4Schristos         while ( ctx->queueEmpty
75*3117ece4Schristos             || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
76*3117ece4Schristos             if (ctx->shutdown) {
77*3117ece4Schristos                 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
78*3117ece4Schristos                  * a few threads will be shutdown while !queueEmpty,
79*3117ece4Schristos                  * but enough threads will remain active to finish the queue */
80*3117ece4Schristos                 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
81*3117ece4Schristos                 return opaque;
82*3117ece4Schristos             }
83*3117ece4Schristos             ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
84*3117ece4Schristos         }
85*3117ece4Schristos         /* Pop a job off the queue */
86*3117ece4Schristos         {   POOL_job const job = ctx->queue[ctx->queueHead];
87*3117ece4Schristos             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
88*3117ece4Schristos             ctx->numThreadsBusy++;
89*3117ece4Schristos             ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
90*3117ece4Schristos             /* Unlock the mutex, signal a pusher, and run the job */
91*3117ece4Schristos             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
92*3117ece4Schristos             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
93*3117ece4Schristos 
94*3117ece4Schristos             job.function(job.opaque);
95*3117ece4Schristos 
96*3117ece4Schristos             /* If the intended queue size was 0, signal after finishing job */
97*3117ece4Schristos             ZSTD_pthread_mutex_lock(&ctx->queueMutex);
98*3117ece4Schristos             ctx->numThreadsBusy--;
99*3117ece4Schristos             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
100*3117ece4Schristos             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
101*3117ece4Schristos         }
102*3117ece4Schristos     }  /* for (;;) */
103*3117ece4Schristos     assert(0);  /* Unreachable */
104*3117ece4Schristos }
105*3117ece4Schristos 
106*3117ece4Schristos /* ZSTD_createThreadPool() : public access point */
107*3117ece4Schristos POOL_ctx* ZSTD_createThreadPool(size_t numThreads) {
108*3117ece4Schristos     return POOL_create (numThreads, 0);
109*3117ece4Schristos }
110*3117ece4Schristos 
111*3117ece4Schristos POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
112*3117ece4Schristos     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
113*3117ece4Schristos }
114*3117ece4Schristos 
115*3117ece4Schristos POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
116*3117ece4Schristos                                ZSTD_customMem customMem)
117*3117ece4Schristos {
118*3117ece4Schristos     POOL_ctx* ctx;
119*3117ece4Schristos     /* Check parameters */
120*3117ece4Schristos     if (!numThreads) { return NULL; }
121*3117ece4Schristos     /* Allocate the context and zero initialize */
122*3117ece4Schristos     ctx = (POOL_ctx*)ZSTD_customCalloc(sizeof(POOL_ctx), customMem);
123*3117ece4Schristos     if (!ctx) { return NULL; }
124*3117ece4Schristos     /* Initialize the job queue.
125*3117ece4Schristos      * It needs one extra space since one space is wasted to differentiate
126*3117ece4Schristos      * empty and full queues.
127*3117ece4Schristos      */
128*3117ece4Schristos     ctx->queueSize = queueSize + 1;
129*3117ece4Schristos     ctx->queue = (POOL_job*)ZSTD_customCalloc(ctx->queueSize * sizeof(POOL_job), customMem);
130*3117ece4Schristos     ctx->queueHead = 0;
131*3117ece4Schristos     ctx->queueTail = 0;
132*3117ece4Schristos     ctx->numThreadsBusy = 0;
133*3117ece4Schristos     ctx->queueEmpty = 1;
134*3117ece4Schristos     {
135*3117ece4Schristos         int error = 0;
136*3117ece4Schristos         error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
137*3117ece4Schristos         error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
138*3117ece4Schristos         error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
139*3117ece4Schristos         if (error) { POOL_free(ctx); return NULL; }
140*3117ece4Schristos     }
141*3117ece4Schristos     ctx->shutdown = 0;
142*3117ece4Schristos     /* Allocate space for the thread handles */
143*3117ece4Schristos     ctx->threads = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
144*3117ece4Schristos     ctx->threadCapacity = 0;
145*3117ece4Schristos     ctx->customMem = customMem;
146*3117ece4Schristos     /* Check for errors */
147*3117ece4Schristos     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
148*3117ece4Schristos     /* Initialize the threads */
149*3117ece4Schristos     {   size_t i;
150*3117ece4Schristos         for (i = 0; i < numThreads; ++i) {
151*3117ece4Schristos             if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
152*3117ece4Schristos                 ctx->threadCapacity = i;
153*3117ece4Schristos                 POOL_free(ctx);
154*3117ece4Schristos                 return NULL;
155*3117ece4Schristos         }   }
156*3117ece4Schristos         ctx->threadCapacity = numThreads;
157*3117ece4Schristos         ctx->threadLimit = numThreads;
158*3117ece4Schristos     }
159*3117ece4Schristos     return ctx;
160*3117ece4Schristos }
161*3117ece4Schristos 
162*3117ece4Schristos /*! POOL_join() :
163*3117ece4Schristos     Shutdown the queue, wake any sleeping threads, and join all of the threads.
164*3117ece4Schristos */
165*3117ece4Schristos static void POOL_join(POOL_ctx* ctx) {
166*3117ece4Schristos     /* Shut down the queue */
167*3117ece4Schristos     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
168*3117ece4Schristos     ctx->shutdown = 1;
169*3117ece4Schristos     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
170*3117ece4Schristos     /* Wake up sleeping threads */
171*3117ece4Schristos     ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
172*3117ece4Schristos     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
173*3117ece4Schristos     /* Join all of the threads */
174*3117ece4Schristos     {   size_t i;
175*3117ece4Schristos         for (i = 0; i < ctx->threadCapacity; ++i) {
176*3117ece4Schristos             ZSTD_pthread_join(ctx->threads[i]);  /* note : could fail */
177*3117ece4Schristos     }   }
178*3117ece4Schristos }
179*3117ece4Schristos 
180*3117ece4Schristos void POOL_free(POOL_ctx *ctx) {
181*3117ece4Schristos     if (!ctx) { return; }
182*3117ece4Schristos     POOL_join(ctx);
183*3117ece4Schristos     ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
184*3117ece4Schristos     ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
185*3117ece4Schristos     ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
186*3117ece4Schristos     ZSTD_customFree(ctx->queue, ctx->customMem);
187*3117ece4Schristos     ZSTD_customFree(ctx->threads, ctx->customMem);
188*3117ece4Schristos     ZSTD_customFree(ctx, ctx->customMem);
189*3117ece4Schristos }
190*3117ece4Schristos 
191*3117ece4Schristos /*! POOL_joinJobs() :
192*3117ece4Schristos  *  Waits for all queued jobs to finish executing.
193*3117ece4Schristos  */
194*3117ece4Schristos void POOL_joinJobs(POOL_ctx* ctx) {
195*3117ece4Schristos     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
196*3117ece4Schristos     while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
197*3117ece4Schristos         ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
198*3117ece4Schristos     }
199*3117ece4Schristos     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
200*3117ece4Schristos }
201*3117ece4Schristos 
202*3117ece4Schristos void ZSTD_freeThreadPool (ZSTD_threadPool* pool) {
203*3117ece4Schristos   POOL_free (pool);
204*3117ece4Schristos }
205*3117ece4Schristos 
206*3117ece4Schristos size_t POOL_sizeof(const POOL_ctx* ctx) {
207*3117ece4Schristos     if (ctx==NULL) return 0;  /* supports sizeof NULL */
208*3117ece4Schristos     return sizeof(*ctx)
209*3117ece4Schristos         + ctx->queueSize * sizeof(POOL_job)
210*3117ece4Schristos         + ctx->threadCapacity * sizeof(ZSTD_pthread_t);
211*3117ece4Schristos }
212*3117ece4Schristos 
213*3117ece4Schristos 
214*3117ece4Schristos /* @return : 0 on success, 1 on error */
215*3117ece4Schristos static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
216*3117ece4Schristos {
217*3117ece4Schristos     if (numThreads <= ctx->threadCapacity) {
218*3117ece4Schristos         if (!numThreads) return 1;
219*3117ece4Schristos         ctx->threadLimit = numThreads;
220*3117ece4Schristos         return 0;
221*3117ece4Schristos     }
222*3117ece4Schristos     /* numThreads > threadCapacity */
223*3117ece4Schristos     {   ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
224*3117ece4Schristos         if (!threadPool) return 1;
225*3117ece4Schristos         /* replace existing thread pool */
226*3117ece4Schristos         ZSTD_memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(ZSTD_pthread_t));
227*3117ece4Schristos         ZSTD_customFree(ctx->threads, ctx->customMem);
228*3117ece4Schristos         ctx->threads = threadPool;
229*3117ece4Schristos         /* Initialize additional threads */
230*3117ece4Schristos         {   size_t threadId;
231*3117ece4Schristos             for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
232*3117ece4Schristos                 if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
233*3117ece4Schristos                     ctx->threadCapacity = threadId;
234*3117ece4Schristos                     return 1;
235*3117ece4Schristos             }   }
236*3117ece4Schristos     }   }
237*3117ece4Schristos     /* successfully expanded */
238*3117ece4Schristos     ctx->threadCapacity = numThreads;
239*3117ece4Schristos     ctx->threadLimit = numThreads;
240*3117ece4Schristos     return 0;
241*3117ece4Schristos }
242*3117ece4Schristos 
243*3117ece4Schristos /* @return : 0 on success, 1 on error */
244*3117ece4Schristos int POOL_resize(POOL_ctx* ctx, size_t numThreads)
245*3117ece4Schristos {
246*3117ece4Schristos     int result;
247*3117ece4Schristos     if (ctx==NULL) return 1;
248*3117ece4Schristos     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
249*3117ece4Schristos     result = POOL_resize_internal(ctx, numThreads);
250*3117ece4Schristos     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
251*3117ece4Schristos     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
252*3117ece4Schristos     return result;
253*3117ece4Schristos }
254*3117ece4Schristos 
255*3117ece4Schristos /**
256*3117ece4Schristos  * Returns 1 if the queue is full and 0 otherwise.
257*3117ece4Schristos  *
258*3117ece4Schristos  * When queueSize is 1 (pool was created with an intended queueSize of 0),
259*3117ece4Schristos  * then a queue is empty if there is a thread free _and_ no job is waiting.
260*3117ece4Schristos  */
261*3117ece4Schristos static int isQueueFull(POOL_ctx const* ctx) {
262*3117ece4Schristos     if (ctx->queueSize > 1) {
263*3117ece4Schristos         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
264*3117ece4Schristos     } else {
265*3117ece4Schristos         return (ctx->numThreadsBusy == ctx->threadLimit) ||
266*3117ece4Schristos                !ctx->queueEmpty;
267*3117ece4Schristos     }
268*3117ece4Schristos }
269*3117ece4Schristos 
270*3117ece4Schristos 
271*3117ece4Schristos static void
272*3117ece4Schristos POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
273*3117ece4Schristos {
274*3117ece4Schristos     POOL_job job;
275*3117ece4Schristos     job.function = function;
276*3117ece4Schristos     job.opaque = opaque;
277*3117ece4Schristos     assert(ctx != NULL);
278*3117ece4Schristos     if (ctx->shutdown) return;
279*3117ece4Schristos 
280*3117ece4Schristos     ctx->queueEmpty = 0;
281*3117ece4Schristos     ctx->queue[ctx->queueTail] = job;
282*3117ece4Schristos     ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
283*3117ece4Schristos     ZSTD_pthread_cond_signal(&ctx->queuePopCond);
284*3117ece4Schristos }
285*3117ece4Schristos 
286*3117ece4Schristos void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
287*3117ece4Schristos {
288*3117ece4Schristos     assert(ctx != NULL);
289*3117ece4Schristos     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
290*3117ece4Schristos     /* Wait until there is space in the queue for the new job */
291*3117ece4Schristos     while (isQueueFull(ctx) && (!ctx->shutdown)) {
292*3117ece4Schristos         ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
293*3117ece4Schristos     }
294*3117ece4Schristos     POOL_add_internal(ctx, function, opaque);
295*3117ece4Schristos     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
296*3117ece4Schristos }
297*3117ece4Schristos 
298*3117ece4Schristos 
299*3117ece4Schristos int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
300*3117ece4Schristos {
301*3117ece4Schristos     assert(ctx != NULL);
302*3117ece4Schristos     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
303*3117ece4Schristos     if (isQueueFull(ctx)) {
304*3117ece4Schristos         ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
305*3117ece4Schristos         return 0;
306*3117ece4Schristos     }
307*3117ece4Schristos     POOL_add_internal(ctx, function, opaque);
308*3117ece4Schristos     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
309*3117ece4Schristos     return 1;
310*3117ece4Schristos }
311*3117ece4Schristos 
312*3117ece4Schristos 
313*3117ece4Schristos #else  /* ZSTD_MULTITHREAD  not defined */
314*3117ece4Schristos 
315*3117ece4Schristos /* ========================== */
316*3117ece4Schristos /* No multi-threading support */
317*3117ece4Schristos /* ========================== */
318*3117ece4Schristos 
319*3117ece4Schristos 
320*3117ece4Schristos /* We don't need any data, but if it is empty, malloc() might return NULL. */
321*3117ece4Schristos struct POOL_ctx_s {
322*3117ece4Schristos     int dummy;
323*3117ece4Schristos };
324*3117ece4Schristos static POOL_ctx g_poolCtx;
325*3117ece4Schristos 
326*3117ece4Schristos POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
327*3117ece4Schristos     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
328*3117ece4Schristos }
329*3117ece4Schristos 
330*3117ece4Schristos POOL_ctx*
331*3117ece4Schristos POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem)
332*3117ece4Schristos {
333*3117ece4Schristos     (void)numThreads;
334*3117ece4Schristos     (void)queueSize;
335*3117ece4Schristos     (void)customMem;
336*3117ece4Schristos     return &g_poolCtx;
337*3117ece4Schristos }
338*3117ece4Schristos 
339*3117ece4Schristos void POOL_free(POOL_ctx* ctx) {
340*3117ece4Schristos     assert(!ctx || ctx == &g_poolCtx);
341*3117ece4Schristos     (void)ctx;
342*3117ece4Schristos }
343*3117ece4Schristos 
344*3117ece4Schristos void POOL_joinJobs(POOL_ctx* ctx){
345*3117ece4Schristos     assert(!ctx || ctx == &g_poolCtx);
346*3117ece4Schristos     (void)ctx;
347*3117ece4Schristos }
348*3117ece4Schristos 
349*3117ece4Schristos int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
350*3117ece4Schristos     (void)ctx; (void)numThreads;
351*3117ece4Schristos     return 0;
352*3117ece4Schristos }
353*3117ece4Schristos 
354*3117ece4Schristos void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
355*3117ece4Schristos     (void)ctx;
356*3117ece4Schristos     function(opaque);
357*3117ece4Schristos }
358*3117ece4Schristos 
359*3117ece4Schristos int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
360*3117ece4Schristos     (void)ctx;
361*3117ece4Schristos     function(opaque);
362*3117ece4Schristos     return 1;
363*3117ece4Schristos }
364*3117ece4Schristos 
365*3117ece4Schristos size_t POOL_sizeof(const POOL_ctx* ctx) {
366*3117ece4Schristos     if (ctx==NULL) return 0;  /* supports sizeof NULL */
367*3117ece4Schristos     assert(ctx == &g_poolCtx);
368*3117ece4Schristos     return sizeof(*ctx);
369*3117ece4Schristos }
370*3117ece4Schristos 
371*3117ece4Schristos #endif  /* ZSTD_MULTITHREAD */
372