1*3117ece4Schristos /* 2*3117ece4Schristos * Copyright (c) Meta Platforms, Inc. and affiliates. 3*3117ece4Schristos * All rights reserved. 4*3117ece4Schristos * 5*3117ece4Schristos * This source code is licensed under both the BSD-style license (found in the 6*3117ece4Schristos * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7*3117ece4Schristos * in the COPYING file in the root directory of this source tree). 8*3117ece4Schristos * You may select, at your option, one of the above-listed licenses. 9*3117ece4Schristos */ 10*3117ece4Schristos 11*3117ece4Schristos 12*3117ece4Schristos /* ====== Dependencies ======= */ 13*3117ece4Schristos #include "../common/allocations.h" /* ZSTD_customCalloc, ZSTD_customFree */ 14*3117ece4Schristos #include "zstd_deps.h" /* size_t */ 15*3117ece4Schristos #include "debug.h" /* assert */ 16*3117ece4Schristos #include "pool.h" 17*3117ece4Schristos 18*3117ece4Schristos /* ====== Compiler specifics ====== */ 19*3117ece4Schristos #if defined(_MSC_VER) 20*3117ece4Schristos # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ 21*3117ece4Schristos #endif 22*3117ece4Schristos 23*3117ece4Schristos 24*3117ece4Schristos #ifdef ZSTD_MULTITHREAD 25*3117ece4Schristos 26*3117ece4Schristos #include "threading.h" /* pthread adaptation */ 27*3117ece4Schristos 28*3117ece4Schristos /* A job is a function and an opaque argument */ 29*3117ece4Schristos typedef struct POOL_job_s { 30*3117ece4Schristos POOL_function function; 31*3117ece4Schristos void *opaque; 32*3117ece4Schristos } POOL_job; 33*3117ece4Schristos 34*3117ece4Schristos struct POOL_ctx_s { 35*3117ece4Schristos ZSTD_customMem customMem; 36*3117ece4Schristos /* Keep track of the threads */ 37*3117ece4Schristos ZSTD_pthread_t* threads; 38*3117ece4Schristos size_t threadCapacity; 39*3117ece4Schristos size_t threadLimit; 40*3117ece4Schristos 41*3117ece4Schristos /* The queue is a circular buffer */ 42*3117ece4Schristos POOL_job *queue; 43*3117ece4Schristos size_t queueHead; 44*3117ece4Schristos size_t queueTail; 45*3117ece4Schristos size_t queueSize; 46*3117ece4Schristos 47*3117ece4Schristos /* The number of threads working on jobs */ 48*3117ece4Schristos size_t numThreadsBusy; 49*3117ece4Schristos /* Indicates if the queue is empty */ 50*3117ece4Schristos int queueEmpty; 51*3117ece4Schristos 52*3117ece4Schristos /* The mutex protects the queue */ 53*3117ece4Schristos ZSTD_pthread_mutex_t queueMutex; 54*3117ece4Schristos /* Condition variable for pushers to wait on when the queue is full */ 55*3117ece4Schristos ZSTD_pthread_cond_t queuePushCond; 56*3117ece4Schristos /* Condition variables for poppers to wait on when the queue is empty */ 57*3117ece4Schristos ZSTD_pthread_cond_t queuePopCond; 58*3117ece4Schristos /* Indicates if the queue is shutting down */ 59*3117ece4Schristos int shutdown; 60*3117ece4Schristos }; 61*3117ece4Schristos 62*3117ece4Schristos /* POOL_thread() : 63*3117ece4Schristos * Work thread for the thread pool. 64*3117ece4Schristos * Waits for jobs and executes them. 65*3117ece4Schristos * @returns : NULL on failure else non-null. 66*3117ece4Schristos */ 67*3117ece4Schristos static void* POOL_thread(void* opaque) { 68*3117ece4Schristos POOL_ctx* const ctx = (POOL_ctx*)opaque; 69*3117ece4Schristos if (!ctx) { return NULL; } 70*3117ece4Schristos for (;;) { 71*3117ece4Schristos /* Lock the mutex and wait for a non-empty queue or until shutdown */ 72*3117ece4Schristos ZSTD_pthread_mutex_lock(&ctx->queueMutex); 73*3117ece4Schristos 74*3117ece4Schristos while ( ctx->queueEmpty 75*3117ece4Schristos || (ctx->numThreadsBusy >= ctx->threadLimit) ) { 76*3117ece4Schristos if (ctx->shutdown) { 77*3117ece4Schristos /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit), 78*3117ece4Schristos * a few threads will be shutdown while !queueEmpty, 79*3117ece4Schristos * but enough threads will remain active to finish the queue */ 80*3117ece4Schristos ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 81*3117ece4Schristos return opaque; 82*3117ece4Schristos } 83*3117ece4Schristos ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); 84*3117ece4Schristos } 85*3117ece4Schristos /* Pop a job off the queue */ 86*3117ece4Schristos { POOL_job const job = ctx->queue[ctx->queueHead]; 87*3117ece4Schristos ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; 88*3117ece4Schristos ctx->numThreadsBusy++; 89*3117ece4Schristos ctx->queueEmpty = (ctx->queueHead == ctx->queueTail); 90*3117ece4Schristos /* Unlock the mutex, signal a pusher, and run the job */ 91*3117ece4Schristos ZSTD_pthread_cond_signal(&ctx->queuePushCond); 92*3117ece4Schristos ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 93*3117ece4Schristos 94*3117ece4Schristos job.function(job.opaque); 95*3117ece4Schristos 96*3117ece4Schristos /* If the intended queue size was 0, signal after finishing job */ 97*3117ece4Schristos ZSTD_pthread_mutex_lock(&ctx->queueMutex); 98*3117ece4Schristos ctx->numThreadsBusy--; 99*3117ece4Schristos ZSTD_pthread_cond_signal(&ctx->queuePushCond); 100*3117ece4Schristos ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 101*3117ece4Schristos } 102*3117ece4Schristos } /* for (;;) */ 103*3117ece4Schristos assert(0); /* Unreachable */ 104*3117ece4Schristos } 105*3117ece4Schristos 106*3117ece4Schristos /* ZSTD_createThreadPool() : public access point */ 107*3117ece4Schristos POOL_ctx* ZSTD_createThreadPool(size_t numThreads) { 108*3117ece4Schristos return POOL_create (numThreads, 0); 109*3117ece4Schristos } 110*3117ece4Schristos 111*3117ece4Schristos POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { 112*3117ece4Schristos return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); 113*3117ece4Schristos } 114*3117ece4Schristos 115*3117ece4Schristos POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, 116*3117ece4Schristos ZSTD_customMem customMem) 117*3117ece4Schristos { 118*3117ece4Schristos POOL_ctx* ctx; 119*3117ece4Schristos /* Check parameters */ 120*3117ece4Schristos if (!numThreads) { return NULL; } 121*3117ece4Schristos /* Allocate the context and zero initialize */ 122*3117ece4Schristos ctx = (POOL_ctx*)ZSTD_customCalloc(sizeof(POOL_ctx), customMem); 123*3117ece4Schristos if (!ctx) { return NULL; } 124*3117ece4Schristos /* Initialize the job queue. 125*3117ece4Schristos * It needs one extra space since one space is wasted to differentiate 126*3117ece4Schristos * empty and full queues. 127*3117ece4Schristos */ 128*3117ece4Schristos ctx->queueSize = queueSize + 1; 129*3117ece4Schristos ctx->queue = (POOL_job*)ZSTD_customCalloc(ctx->queueSize * sizeof(POOL_job), customMem); 130*3117ece4Schristos ctx->queueHead = 0; 131*3117ece4Schristos ctx->queueTail = 0; 132*3117ece4Schristos ctx->numThreadsBusy = 0; 133*3117ece4Schristos ctx->queueEmpty = 1; 134*3117ece4Schristos { 135*3117ece4Schristos int error = 0; 136*3117ece4Schristos error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL); 137*3117ece4Schristos error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL); 138*3117ece4Schristos error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL); 139*3117ece4Schristos if (error) { POOL_free(ctx); return NULL; } 140*3117ece4Schristos } 141*3117ece4Schristos ctx->shutdown = 0; 142*3117ece4Schristos /* Allocate space for the thread handles */ 143*3117ece4Schristos ctx->threads = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), customMem); 144*3117ece4Schristos ctx->threadCapacity = 0; 145*3117ece4Schristos ctx->customMem = customMem; 146*3117ece4Schristos /* Check for errors */ 147*3117ece4Schristos if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } 148*3117ece4Schristos /* Initialize the threads */ 149*3117ece4Schristos { size_t i; 150*3117ece4Schristos for (i = 0; i < numThreads; ++i) { 151*3117ece4Schristos if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { 152*3117ece4Schristos ctx->threadCapacity = i; 153*3117ece4Schristos POOL_free(ctx); 154*3117ece4Schristos return NULL; 155*3117ece4Schristos } } 156*3117ece4Schristos ctx->threadCapacity = numThreads; 157*3117ece4Schristos ctx->threadLimit = numThreads; 158*3117ece4Schristos } 159*3117ece4Schristos return ctx; 160*3117ece4Schristos } 161*3117ece4Schristos 162*3117ece4Schristos /*! POOL_join() : 163*3117ece4Schristos Shutdown the queue, wake any sleeping threads, and join all of the threads. 164*3117ece4Schristos */ 165*3117ece4Schristos static void POOL_join(POOL_ctx* ctx) { 166*3117ece4Schristos /* Shut down the queue */ 167*3117ece4Schristos ZSTD_pthread_mutex_lock(&ctx->queueMutex); 168*3117ece4Schristos ctx->shutdown = 1; 169*3117ece4Schristos ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 170*3117ece4Schristos /* Wake up sleeping threads */ 171*3117ece4Schristos ZSTD_pthread_cond_broadcast(&ctx->queuePushCond); 172*3117ece4Schristos ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); 173*3117ece4Schristos /* Join all of the threads */ 174*3117ece4Schristos { size_t i; 175*3117ece4Schristos for (i = 0; i < ctx->threadCapacity; ++i) { 176*3117ece4Schristos ZSTD_pthread_join(ctx->threads[i]); /* note : could fail */ 177*3117ece4Schristos } } 178*3117ece4Schristos } 179*3117ece4Schristos 180*3117ece4Schristos void POOL_free(POOL_ctx *ctx) { 181*3117ece4Schristos if (!ctx) { return; } 182*3117ece4Schristos POOL_join(ctx); 183*3117ece4Schristos ZSTD_pthread_mutex_destroy(&ctx->queueMutex); 184*3117ece4Schristos ZSTD_pthread_cond_destroy(&ctx->queuePushCond); 185*3117ece4Schristos ZSTD_pthread_cond_destroy(&ctx->queuePopCond); 186*3117ece4Schristos ZSTD_customFree(ctx->queue, ctx->customMem); 187*3117ece4Schristos ZSTD_customFree(ctx->threads, ctx->customMem); 188*3117ece4Schristos ZSTD_customFree(ctx, ctx->customMem); 189*3117ece4Schristos } 190*3117ece4Schristos 191*3117ece4Schristos /*! POOL_joinJobs() : 192*3117ece4Schristos * Waits for all queued jobs to finish executing. 193*3117ece4Schristos */ 194*3117ece4Schristos void POOL_joinJobs(POOL_ctx* ctx) { 195*3117ece4Schristos ZSTD_pthread_mutex_lock(&ctx->queueMutex); 196*3117ece4Schristos while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) { 197*3117ece4Schristos ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); 198*3117ece4Schristos } 199*3117ece4Schristos ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 200*3117ece4Schristos } 201*3117ece4Schristos 202*3117ece4Schristos void ZSTD_freeThreadPool (ZSTD_threadPool* pool) { 203*3117ece4Schristos POOL_free (pool); 204*3117ece4Schristos } 205*3117ece4Schristos 206*3117ece4Schristos size_t POOL_sizeof(const POOL_ctx* ctx) { 207*3117ece4Schristos if (ctx==NULL) return 0; /* supports sizeof NULL */ 208*3117ece4Schristos return sizeof(*ctx) 209*3117ece4Schristos + ctx->queueSize * sizeof(POOL_job) 210*3117ece4Schristos + ctx->threadCapacity * sizeof(ZSTD_pthread_t); 211*3117ece4Schristos } 212*3117ece4Schristos 213*3117ece4Schristos 214*3117ece4Schristos /* @return : 0 on success, 1 on error */ 215*3117ece4Schristos static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) 216*3117ece4Schristos { 217*3117ece4Schristos if (numThreads <= ctx->threadCapacity) { 218*3117ece4Schristos if (!numThreads) return 1; 219*3117ece4Schristos ctx->threadLimit = numThreads; 220*3117ece4Schristos return 0; 221*3117ece4Schristos } 222*3117ece4Schristos /* numThreads > threadCapacity */ 223*3117ece4Schristos { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem); 224*3117ece4Schristos if (!threadPool) return 1; 225*3117ece4Schristos /* replace existing thread pool */ 226*3117ece4Schristos ZSTD_memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(ZSTD_pthread_t)); 227*3117ece4Schristos ZSTD_customFree(ctx->threads, ctx->customMem); 228*3117ece4Schristos ctx->threads = threadPool; 229*3117ece4Schristos /* Initialize additional threads */ 230*3117ece4Schristos { size_t threadId; 231*3117ece4Schristos for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) { 232*3117ece4Schristos if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) { 233*3117ece4Schristos ctx->threadCapacity = threadId; 234*3117ece4Schristos return 1; 235*3117ece4Schristos } } 236*3117ece4Schristos } } 237*3117ece4Schristos /* successfully expanded */ 238*3117ece4Schristos ctx->threadCapacity = numThreads; 239*3117ece4Schristos ctx->threadLimit = numThreads; 240*3117ece4Schristos return 0; 241*3117ece4Schristos } 242*3117ece4Schristos 243*3117ece4Schristos /* @return : 0 on success, 1 on error */ 244*3117ece4Schristos int POOL_resize(POOL_ctx* ctx, size_t numThreads) 245*3117ece4Schristos { 246*3117ece4Schristos int result; 247*3117ece4Schristos if (ctx==NULL) return 1; 248*3117ece4Schristos ZSTD_pthread_mutex_lock(&ctx->queueMutex); 249*3117ece4Schristos result = POOL_resize_internal(ctx, numThreads); 250*3117ece4Schristos ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); 251*3117ece4Schristos ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 252*3117ece4Schristos return result; 253*3117ece4Schristos } 254*3117ece4Schristos 255*3117ece4Schristos /** 256*3117ece4Schristos * Returns 1 if the queue is full and 0 otherwise. 257*3117ece4Schristos * 258*3117ece4Schristos * When queueSize is 1 (pool was created with an intended queueSize of 0), 259*3117ece4Schristos * then a queue is empty if there is a thread free _and_ no job is waiting. 260*3117ece4Schristos */ 261*3117ece4Schristos static int isQueueFull(POOL_ctx const* ctx) { 262*3117ece4Schristos if (ctx->queueSize > 1) { 263*3117ece4Schristos return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize); 264*3117ece4Schristos } else { 265*3117ece4Schristos return (ctx->numThreadsBusy == ctx->threadLimit) || 266*3117ece4Schristos !ctx->queueEmpty; 267*3117ece4Schristos } 268*3117ece4Schristos } 269*3117ece4Schristos 270*3117ece4Schristos 271*3117ece4Schristos static void 272*3117ece4Schristos POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque) 273*3117ece4Schristos { 274*3117ece4Schristos POOL_job job; 275*3117ece4Schristos job.function = function; 276*3117ece4Schristos job.opaque = opaque; 277*3117ece4Schristos assert(ctx != NULL); 278*3117ece4Schristos if (ctx->shutdown) return; 279*3117ece4Schristos 280*3117ece4Schristos ctx->queueEmpty = 0; 281*3117ece4Schristos ctx->queue[ctx->queueTail] = job; 282*3117ece4Schristos ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; 283*3117ece4Schristos ZSTD_pthread_cond_signal(&ctx->queuePopCond); 284*3117ece4Schristos } 285*3117ece4Schristos 286*3117ece4Schristos void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) 287*3117ece4Schristos { 288*3117ece4Schristos assert(ctx != NULL); 289*3117ece4Schristos ZSTD_pthread_mutex_lock(&ctx->queueMutex); 290*3117ece4Schristos /* Wait until there is space in the queue for the new job */ 291*3117ece4Schristos while (isQueueFull(ctx) && (!ctx->shutdown)) { 292*3117ece4Schristos ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); 293*3117ece4Schristos } 294*3117ece4Schristos POOL_add_internal(ctx, function, opaque); 295*3117ece4Schristos ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 296*3117ece4Schristos } 297*3117ece4Schristos 298*3117ece4Schristos 299*3117ece4Schristos int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) 300*3117ece4Schristos { 301*3117ece4Schristos assert(ctx != NULL); 302*3117ece4Schristos ZSTD_pthread_mutex_lock(&ctx->queueMutex); 303*3117ece4Schristos if (isQueueFull(ctx)) { 304*3117ece4Schristos ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 305*3117ece4Schristos return 0; 306*3117ece4Schristos } 307*3117ece4Schristos POOL_add_internal(ctx, function, opaque); 308*3117ece4Schristos ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 309*3117ece4Schristos return 1; 310*3117ece4Schristos } 311*3117ece4Schristos 312*3117ece4Schristos 313*3117ece4Schristos #else /* ZSTD_MULTITHREAD not defined */ 314*3117ece4Schristos 315*3117ece4Schristos /* ========================== */ 316*3117ece4Schristos /* No multi-threading support */ 317*3117ece4Schristos /* ========================== */ 318*3117ece4Schristos 319*3117ece4Schristos 320*3117ece4Schristos /* We don't need any data, but if it is empty, malloc() might return NULL. */ 321*3117ece4Schristos struct POOL_ctx_s { 322*3117ece4Schristos int dummy; 323*3117ece4Schristos }; 324*3117ece4Schristos static POOL_ctx g_poolCtx; 325*3117ece4Schristos 326*3117ece4Schristos POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { 327*3117ece4Schristos return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); 328*3117ece4Schristos } 329*3117ece4Schristos 330*3117ece4Schristos POOL_ctx* 331*3117ece4Schristos POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) 332*3117ece4Schristos { 333*3117ece4Schristos (void)numThreads; 334*3117ece4Schristos (void)queueSize; 335*3117ece4Schristos (void)customMem; 336*3117ece4Schristos return &g_poolCtx; 337*3117ece4Schristos } 338*3117ece4Schristos 339*3117ece4Schristos void POOL_free(POOL_ctx* ctx) { 340*3117ece4Schristos assert(!ctx || ctx == &g_poolCtx); 341*3117ece4Schristos (void)ctx; 342*3117ece4Schristos } 343*3117ece4Schristos 344*3117ece4Schristos void POOL_joinJobs(POOL_ctx* ctx){ 345*3117ece4Schristos assert(!ctx || ctx == &g_poolCtx); 346*3117ece4Schristos (void)ctx; 347*3117ece4Schristos } 348*3117ece4Schristos 349*3117ece4Schristos int POOL_resize(POOL_ctx* ctx, size_t numThreads) { 350*3117ece4Schristos (void)ctx; (void)numThreads; 351*3117ece4Schristos return 0; 352*3117ece4Schristos } 353*3117ece4Schristos 354*3117ece4Schristos void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) { 355*3117ece4Schristos (void)ctx; 356*3117ece4Schristos function(opaque); 357*3117ece4Schristos } 358*3117ece4Schristos 359*3117ece4Schristos int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) { 360*3117ece4Schristos (void)ctx; 361*3117ece4Schristos function(opaque); 362*3117ece4Schristos return 1; 363*3117ece4Schristos } 364*3117ece4Schristos 365*3117ece4Schristos size_t POOL_sizeof(const POOL_ctx* ctx) { 366*3117ece4Schristos if (ctx==NULL) return 0; /* supports sizeof NULL */ 367*3117ece4Schristos assert(ctx == &g_poolCtx); 368*3117ece4Schristos return sizeof(*ctx); 369*3117ece4Schristos } 370*3117ece4Schristos 371*3117ece4Schristos #endif /* ZSTD_MULTITHREAD */ 372