1*15ab8c86SJohn Marino /////////////////////////////////////////////////////////////////////////////// 2*15ab8c86SJohn Marino // 3*15ab8c86SJohn Marino /// \file stream_encoder_mt.c 4*15ab8c86SJohn Marino /// \brief Multithreaded .xz Stream encoder 5*15ab8c86SJohn Marino // 6*15ab8c86SJohn Marino // Author: Lasse Collin 7*15ab8c86SJohn Marino // 8*15ab8c86SJohn Marino // This file has been put into the public domain. 9*15ab8c86SJohn Marino // You can do whatever you want with this file. 10*15ab8c86SJohn Marino // 11*15ab8c86SJohn Marino /////////////////////////////////////////////////////////////////////////////// 12*15ab8c86SJohn Marino 13*15ab8c86SJohn Marino #include "filter_encoder.h" 14*15ab8c86SJohn Marino #include "easy_preset.h" 15*15ab8c86SJohn Marino #include "block_encoder.h" 16*15ab8c86SJohn Marino #include "block_buffer_encoder.h" 17*15ab8c86SJohn Marino #include "index_encoder.h" 18*15ab8c86SJohn Marino #include "outqueue.h" 19*15ab8c86SJohn Marino 20*15ab8c86SJohn Marino 21*15ab8c86SJohn Marino /// Maximum supported block size. This makes it simpler to prevent integer 22*15ab8c86SJohn Marino /// overflows if we are given unusually large block size. 23*15ab8c86SJohn Marino #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) 24*15ab8c86SJohn Marino 25*15ab8c86SJohn Marino 26*15ab8c86SJohn Marino typedef enum { 27*15ab8c86SJohn Marino /// Waiting for work. 28*15ab8c86SJohn Marino THR_IDLE, 29*15ab8c86SJohn Marino 30*15ab8c86SJohn Marino /// Encoding is in progress. 31*15ab8c86SJohn Marino THR_RUN, 32*15ab8c86SJohn Marino 33*15ab8c86SJohn Marino /// Encoding is in progress but no more input data will 34*15ab8c86SJohn Marino /// be read. 35*15ab8c86SJohn Marino THR_FINISH, 36*15ab8c86SJohn Marino 37*15ab8c86SJohn Marino /// The main thread wants the thread to stop whatever it was doing 38*15ab8c86SJohn Marino /// but not exit. 39*15ab8c86SJohn Marino THR_STOP, 40*15ab8c86SJohn Marino 41*15ab8c86SJohn Marino /// The main thread wants the thread to exit. We could use 42*15ab8c86SJohn Marino /// cancellation but since there's stopped anyway, this is lazier. 43*15ab8c86SJohn Marino THR_EXIT, 44*15ab8c86SJohn Marino 45*15ab8c86SJohn Marino } worker_state; 46*15ab8c86SJohn Marino 47*15ab8c86SJohn Marino 48*15ab8c86SJohn Marino typedef struct worker_thread_s worker_thread; 49*15ab8c86SJohn Marino struct worker_thread_s { 50*15ab8c86SJohn Marino worker_state state; 51*15ab8c86SJohn Marino 52*15ab8c86SJohn Marino /// Input buffer of coder->block_size bytes. The main thread will 53*15ab8c86SJohn Marino /// put new input into this and update in_size accordingly. Once 54*15ab8c86SJohn Marino /// no more input is coming, state will be set to THR_FINISH. 55*15ab8c86SJohn Marino uint8_t *in; 56*15ab8c86SJohn Marino 57*15ab8c86SJohn Marino /// Amount of data available in the input buffer. This is modified 58*15ab8c86SJohn Marino /// only by the main thread. 59*15ab8c86SJohn Marino size_t in_size; 60*15ab8c86SJohn Marino 61*15ab8c86SJohn Marino /// Output buffer for this thread. This is set by the main 62*15ab8c86SJohn Marino /// thread every time a new Block is started with this thread 63*15ab8c86SJohn Marino /// structure. 64*15ab8c86SJohn Marino lzma_outbuf *outbuf; 65*15ab8c86SJohn Marino 66*15ab8c86SJohn Marino /// Pointer to the main structure is needed when putting this 67*15ab8c86SJohn Marino /// thread back to the stack of free threads. 68*15ab8c86SJohn Marino lzma_coder *coder; 69*15ab8c86SJohn Marino 70*15ab8c86SJohn Marino /// The allocator is set by the main thread. Since a copy of the 71*15ab8c86SJohn Marino /// pointer is kept here, the application must not change the 72*15ab8c86SJohn Marino /// allocator before calling lzma_end(). 73*15ab8c86SJohn Marino const lzma_allocator *allocator; 74*15ab8c86SJohn Marino 75*15ab8c86SJohn Marino /// Amount of uncompressed data that has already been compressed. 76*15ab8c86SJohn Marino uint64_t progress_in; 77*15ab8c86SJohn Marino 78*15ab8c86SJohn Marino /// Amount of compressed data that is ready. 79*15ab8c86SJohn Marino uint64_t progress_out; 80*15ab8c86SJohn Marino 81*15ab8c86SJohn Marino /// Block encoder 82*15ab8c86SJohn Marino lzma_next_coder block_encoder; 83*15ab8c86SJohn Marino 84*15ab8c86SJohn Marino /// Compression options for this Block 85*15ab8c86SJohn Marino lzma_block block_options; 86*15ab8c86SJohn Marino 87*15ab8c86SJohn Marino /// Next structure in the stack of free worker threads. 88*15ab8c86SJohn Marino worker_thread *next; 89*15ab8c86SJohn Marino 90*15ab8c86SJohn Marino mythread_mutex mutex; 91*15ab8c86SJohn Marino mythread_cond cond; 92*15ab8c86SJohn Marino 93*15ab8c86SJohn Marino /// The ID of this thread is used to join the thread 94*15ab8c86SJohn Marino /// when it's not needed anymore. 95*15ab8c86SJohn Marino mythread thread_id; 96*15ab8c86SJohn Marino }; 97*15ab8c86SJohn Marino 98*15ab8c86SJohn Marino 99*15ab8c86SJohn Marino struct lzma_coder_s { 100*15ab8c86SJohn Marino enum { 101*15ab8c86SJohn Marino SEQ_STREAM_HEADER, 102*15ab8c86SJohn Marino SEQ_BLOCK, 103*15ab8c86SJohn Marino SEQ_INDEX, 104*15ab8c86SJohn Marino SEQ_STREAM_FOOTER, 105*15ab8c86SJohn Marino } sequence; 106*15ab8c86SJohn Marino 107*15ab8c86SJohn Marino /// Start a new Block every block_size bytes of input unless 108*15ab8c86SJohn Marino /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. 109*15ab8c86SJohn Marino size_t block_size; 110*15ab8c86SJohn Marino 111*15ab8c86SJohn Marino /// The filter chain currently in use 112*15ab8c86SJohn Marino lzma_filter filters[LZMA_FILTERS_MAX + 1]; 113*15ab8c86SJohn Marino 114*15ab8c86SJohn Marino 115*15ab8c86SJohn Marino /// Index to hold sizes of the Blocks 116*15ab8c86SJohn Marino lzma_index *index; 117*15ab8c86SJohn Marino 118*15ab8c86SJohn Marino /// Index encoder 119*15ab8c86SJohn Marino lzma_next_coder index_encoder; 120*15ab8c86SJohn Marino 121*15ab8c86SJohn Marino 122*15ab8c86SJohn Marino /// Stream Flags for encoding the Stream Header and Stream Footer. 123*15ab8c86SJohn Marino lzma_stream_flags stream_flags; 124*15ab8c86SJohn Marino 125*15ab8c86SJohn Marino /// Buffer to hold Stream Header and Stream Footer. 126*15ab8c86SJohn Marino uint8_t header[LZMA_STREAM_HEADER_SIZE]; 127*15ab8c86SJohn Marino 128*15ab8c86SJohn Marino /// Read position in header[] 129*15ab8c86SJohn Marino size_t header_pos; 130*15ab8c86SJohn Marino 131*15ab8c86SJohn Marino 132*15ab8c86SJohn Marino /// Output buffer queue for compressed data 133*15ab8c86SJohn Marino lzma_outq outq; 134*15ab8c86SJohn Marino 135*15ab8c86SJohn Marino 136*15ab8c86SJohn Marino /// Maximum wait time if cannot use all the input and cannot 137*15ab8c86SJohn Marino /// fill the output buffer. This is in milliseconds. 138*15ab8c86SJohn Marino uint32_t timeout; 139*15ab8c86SJohn Marino 140*15ab8c86SJohn Marino 141*15ab8c86SJohn Marino /// Error code from a worker thread 142*15ab8c86SJohn Marino lzma_ret thread_error; 143*15ab8c86SJohn Marino 144*15ab8c86SJohn Marino /// Array of allocated thread-specific structures 145*15ab8c86SJohn Marino worker_thread *threads; 146*15ab8c86SJohn Marino 147*15ab8c86SJohn Marino /// Number of structures in "threads" above. This is also the 148*15ab8c86SJohn Marino /// number of threads that will be created at maximum. 149*15ab8c86SJohn Marino uint32_t threads_max; 150*15ab8c86SJohn Marino 151*15ab8c86SJohn Marino /// Number of thread structures that have been initialized, and 152*15ab8c86SJohn Marino /// thus the number of worker threads actually created so far. 153*15ab8c86SJohn Marino uint32_t threads_initialized; 154*15ab8c86SJohn Marino 155*15ab8c86SJohn Marino /// Stack of free threads. When a thread finishes, it puts itself 156*15ab8c86SJohn Marino /// back into this stack. This starts as empty because threads 157*15ab8c86SJohn Marino /// are created only when actually needed. 158*15ab8c86SJohn Marino worker_thread *threads_free; 159*15ab8c86SJohn Marino 160*15ab8c86SJohn Marino /// The most recent worker thread to which the main thread writes 161*15ab8c86SJohn Marino /// the new input from the application. 162*15ab8c86SJohn Marino worker_thread *thr; 163*15ab8c86SJohn Marino 164*15ab8c86SJohn Marino 165*15ab8c86SJohn Marino /// Amount of uncompressed data in Blocks that have already 166*15ab8c86SJohn Marino /// been finished. 167*15ab8c86SJohn Marino uint64_t progress_in; 168*15ab8c86SJohn Marino 169*15ab8c86SJohn Marino /// Amount of compressed data in Stream Header + Blocks that 170*15ab8c86SJohn Marino /// have already been finished. 171*15ab8c86SJohn Marino uint64_t progress_out; 172*15ab8c86SJohn Marino 173*15ab8c86SJohn Marino 174*15ab8c86SJohn Marino mythread_mutex mutex; 175*15ab8c86SJohn Marino mythread_cond cond; 176*15ab8c86SJohn Marino }; 177*15ab8c86SJohn Marino 178*15ab8c86SJohn Marino 179*15ab8c86SJohn Marino /// Tell the main thread that something has gone wrong. 180*15ab8c86SJohn Marino static void 181*15ab8c86SJohn Marino worker_error(worker_thread *thr, lzma_ret ret) 182*15ab8c86SJohn Marino { 183*15ab8c86SJohn Marino assert(ret != LZMA_OK); 184*15ab8c86SJohn Marino assert(ret != LZMA_STREAM_END); 185*15ab8c86SJohn Marino 186*15ab8c86SJohn Marino mythread_sync(thr->coder->mutex) { 187*15ab8c86SJohn Marino if (thr->coder->thread_error == LZMA_OK) 188*15ab8c86SJohn Marino thr->coder->thread_error = ret; 189*15ab8c86SJohn Marino 190*15ab8c86SJohn Marino mythread_cond_signal(&thr->coder->cond); 191*15ab8c86SJohn Marino } 192*15ab8c86SJohn Marino 193*15ab8c86SJohn Marino return; 194*15ab8c86SJohn Marino } 195*15ab8c86SJohn Marino 196*15ab8c86SJohn Marino 197*15ab8c86SJohn Marino static worker_state 198*15ab8c86SJohn Marino worker_encode(worker_thread *thr, worker_state state) 199*15ab8c86SJohn Marino { 200*15ab8c86SJohn Marino assert(thr->progress_in == 0); 201*15ab8c86SJohn Marino assert(thr->progress_out == 0); 202*15ab8c86SJohn Marino 203*15ab8c86SJohn Marino // Set the Block options. 204*15ab8c86SJohn Marino thr->block_options = (lzma_block){ 205*15ab8c86SJohn Marino .version = 0, 206*15ab8c86SJohn Marino .check = thr->coder->stream_flags.check, 207*15ab8c86SJohn Marino .compressed_size = thr->coder->outq.buf_size_max, 208*15ab8c86SJohn Marino .uncompressed_size = thr->coder->block_size, 209*15ab8c86SJohn Marino 210*15ab8c86SJohn Marino // TODO: To allow changing the filter chain, the filters 211*15ab8c86SJohn Marino // array must be copied to each worker_thread. 212*15ab8c86SJohn Marino .filters = thr->coder->filters, 213*15ab8c86SJohn Marino }; 214*15ab8c86SJohn Marino 215*15ab8c86SJohn Marino // Calculate maximum size of the Block Header. This amount is 216*15ab8c86SJohn Marino // reserved in the beginning of the buffer so that Block Header 217*15ab8c86SJohn Marino // along with Compressed Size and Uncompressed Size can be 218*15ab8c86SJohn Marino // written there. 219*15ab8c86SJohn Marino lzma_ret ret = lzma_block_header_size(&thr->block_options); 220*15ab8c86SJohn Marino if (ret != LZMA_OK) { 221*15ab8c86SJohn Marino worker_error(thr, ret); 222*15ab8c86SJohn Marino return THR_STOP; 223*15ab8c86SJohn Marino } 224*15ab8c86SJohn Marino 225*15ab8c86SJohn Marino // Initialize the Block encoder. 226*15ab8c86SJohn Marino ret = lzma_block_encoder_init(&thr->block_encoder, 227*15ab8c86SJohn Marino thr->allocator, &thr->block_options); 228*15ab8c86SJohn Marino if (ret != LZMA_OK) { 229*15ab8c86SJohn Marino worker_error(thr, ret); 230*15ab8c86SJohn Marino return THR_STOP; 231*15ab8c86SJohn Marino } 232*15ab8c86SJohn Marino 233*15ab8c86SJohn Marino size_t in_pos = 0; 234*15ab8c86SJohn Marino size_t in_size = 0; 235*15ab8c86SJohn Marino 236*15ab8c86SJohn Marino thr->outbuf->size = thr->block_options.header_size; 237*15ab8c86SJohn Marino const size_t out_size = thr->coder->outq.buf_size_max; 238*15ab8c86SJohn Marino 239*15ab8c86SJohn Marino do { 240*15ab8c86SJohn Marino mythread_sync(thr->mutex) { 241*15ab8c86SJohn Marino // Store in_pos and out_pos into *thr so that 242*15ab8c86SJohn Marino // an application may read them via 243*15ab8c86SJohn Marino // lzma_get_progress() to get progress information. 244*15ab8c86SJohn Marino // 245*15ab8c86SJohn Marino // NOTE: These aren't updated when the encoding 246*15ab8c86SJohn Marino // finishes. Instead, the final values are taken 247*15ab8c86SJohn Marino // later from thr->outbuf. 248*15ab8c86SJohn Marino thr->progress_in = in_pos; 249*15ab8c86SJohn Marino thr->progress_out = thr->outbuf->size; 250*15ab8c86SJohn Marino 251*15ab8c86SJohn Marino while (in_size == thr->in_size 252*15ab8c86SJohn Marino && thr->state == THR_RUN) 253*15ab8c86SJohn Marino mythread_cond_wait(&thr->cond, &thr->mutex); 254*15ab8c86SJohn Marino 255*15ab8c86SJohn Marino state = thr->state; 256*15ab8c86SJohn Marino in_size = thr->in_size; 257*15ab8c86SJohn Marino } 258*15ab8c86SJohn Marino 259*15ab8c86SJohn Marino // Return if we were asked to stop or exit. 260*15ab8c86SJohn Marino if (state >= THR_STOP) 261*15ab8c86SJohn Marino return state; 262*15ab8c86SJohn Marino 263*15ab8c86SJohn Marino lzma_action action = state == THR_FINISH 264*15ab8c86SJohn Marino ? LZMA_FINISH : LZMA_RUN; 265*15ab8c86SJohn Marino 266*15ab8c86SJohn Marino // Limit the amount of input given to the Block encoder 267*15ab8c86SJohn Marino // at once. This way this thread can react fairly quickly 268*15ab8c86SJohn Marino // if the main thread wants us to stop or exit. 269*15ab8c86SJohn Marino static const size_t in_chunk_max = 16384; 270*15ab8c86SJohn Marino size_t in_limit = in_size; 271*15ab8c86SJohn Marino if (in_size - in_pos > in_chunk_max) { 272*15ab8c86SJohn Marino in_limit = in_pos + in_chunk_max; 273*15ab8c86SJohn Marino action = LZMA_RUN; 274*15ab8c86SJohn Marino } 275*15ab8c86SJohn Marino 276*15ab8c86SJohn Marino ret = thr->block_encoder.code( 277*15ab8c86SJohn Marino thr->block_encoder.coder, thr->allocator, 278*15ab8c86SJohn Marino thr->in, &in_pos, in_limit, thr->outbuf->buf, 279*15ab8c86SJohn Marino &thr->outbuf->size, out_size, action); 280*15ab8c86SJohn Marino } while (ret == LZMA_OK && thr->outbuf->size < out_size); 281*15ab8c86SJohn Marino 282*15ab8c86SJohn Marino switch (ret) { 283*15ab8c86SJohn Marino case LZMA_STREAM_END: 284*15ab8c86SJohn Marino assert(state == THR_FINISH); 285*15ab8c86SJohn Marino 286*15ab8c86SJohn Marino // Encode the Block Header. By doing it after 287*15ab8c86SJohn Marino // the compression, we can store the Compressed Size 288*15ab8c86SJohn Marino // and Uncompressed Size fields. 289*15ab8c86SJohn Marino ret = lzma_block_header_encode(&thr->block_options, 290*15ab8c86SJohn Marino thr->outbuf->buf); 291*15ab8c86SJohn Marino if (ret != LZMA_OK) { 292*15ab8c86SJohn Marino worker_error(thr, ret); 293*15ab8c86SJohn Marino return THR_STOP; 294*15ab8c86SJohn Marino } 295*15ab8c86SJohn Marino 296*15ab8c86SJohn Marino break; 297*15ab8c86SJohn Marino 298*15ab8c86SJohn Marino case LZMA_OK: 299*15ab8c86SJohn Marino // The data was incompressible. Encode it using uncompressed 300*15ab8c86SJohn Marino // LZMA2 chunks. 301*15ab8c86SJohn Marino // 302*15ab8c86SJohn Marino // First wait that we have gotten all the input. 303*15ab8c86SJohn Marino mythread_sync(thr->mutex) { 304*15ab8c86SJohn Marino while (thr->state == THR_RUN) 305*15ab8c86SJohn Marino mythread_cond_wait(&thr->cond, &thr->mutex); 306*15ab8c86SJohn Marino 307*15ab8c86SJohn Marino state = thr->state; 308*15ab8c86SJohn Marino in_size = thr->in_size; 309*15ab8c86SJohn Marino } 310*15ab8c86SJohn Marino 311*15ab8c86SJohn Marino if (state >= THR_STOP) 312*15ab8c86SJohn Marino return state; 313*15ab8c86SJohn Marino 314*15ab8c86SJohn Marino // Do the encoding. This takes care of the Block Header too. 315*15ab8c86SJohn Marino thr->outbuf->size = 0; 316*15ab8c86SJohn Marino ret = lzma_block_uncomp_encode(&thr->block_options, 317*15ab8c86SJohn Marino thr->in, in_size, thr->outbuf->buf, 318*15ab8c86SJohn Marino &thr->outbuf->size, out_size); 319*15ab8c86SJohn Marino 320*15ab8c86SJohn Marino // It shouldn't fail. 321*15ab8c86SJohn Marino if (ret != LZMA_OK) { 322*15ab8c86SJohn Marino worker_error(thr, LZMA_PROG_ERROR); 323*15ab8c86SJohn Marino return THR_STOP; 324*15ab8c86SJohn Marino } 325*15ab8c86SJohn Marino 326*15ab8c86SJohn Marino break; 327*15ab8c86SJohn Marino 328*15ab8c86SJohn Marino default: 329*15ab8c86SJohn Marino worker_error(thr, ret); 330*15ab8c86SJohn Marino return THR_STOP; 331*15ab8c86SJohn Marino } 332*15ab8c86SJohn Marino 333*15ab8c86SJohn Marino // Set the size information that will be read by the main thread 334*15ab8c86SJohn Marino // to write the Index field. 335*15ab8c86SJohn Marino thr->outbuf->unpadded_size 336*15ab8c86SJohn Marino = lzma_block_unpadded_size(&thr->block_options); 337*15ab8c86SJohn Marino assert(thr->outbuf->unpadded_size != 0); 338*15ab8c86SJohn Marino thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; 339*15ab8c86SJohn Marino 340*15ab8c86SJohn Marino return THR_FINISH; 341*15ab8c86SJohn Marino } 342*15ab8c86SJohn Marino 343*15ab8c86SJohn Marino 344*15ab8c86SJohn Marino static MYTHREAD_RET_TYPE 345*15ab8c86SJohn Marino worker_start(void *thr_ptr) 346*15ab8c86SJohn Marino { 347*15ab8c86SJohn Marino worker_thread *thr = thr_ptr; 348*15ab8c86SJohn Marino worker_state state = THR_IDLE; // Init to silence a warning 349*15ab8c86SJohn Marino 350*15ab8c86SJohn Marino while (true) { 351*15ab8c86SJohn Marino // Wait for work. 352*15ab8c86SJohn Marino mythread_sync(thr->mutex) { 353*15ab8c86SJohn Marino while (true) { 354*15ab8c86SJohn Marino // The thread is already idle so if we are 355*15ab8c86SJohn Marino // requested to stop, just set the state. 356*15ab8c86SJohn Marino if (thr->state == THR_STOP) { 357*15ab8c86SJohn Marino thr->state = THR_IDLE; 358*15ab8c86SJohn Marino mythread_cond_signal(&thr->cond); 359*15ab8c86SJohn Marino } 360*15ab8c86SJohn Marino 361*15ab8c86SJohn Marino state = thr->state; 362*15ab8c86SJohn Marino if (state != THR_IDLE) 363*15ab8c86SJohn Marino break; 364*15ab8c86SJohn Marino 365*15ab8c86SJohn Marino mythread_cond_wait(&thr->cond, &thr->mutex); 366*15ab8c86SJohn Marino } 367*15ab8c86SJohn Marino } 368*15ab8c86SJohn Marino 369*15ab8c86SJohn Marino assert(state != THR_IDLE); 370*15ab8c86SJohn Marino assert(state != THR_STOP); 371*15ab8c86SJohn Marino 372*15ab8c86SJohn Marino if (state <= THR_FINISH) 373*15ab8c86SJohn Marino state = worker_encode(thr, state); 374*15ab8c86SJohn Marino 375*15ab8c86SJohn Marino if (state == THR_EXIT) 376*15ab8c86SJohn Marino break; 377*15ab8c86SJohn Marino 378*15ab8c86SJohn Marino // Mark the thread as idle unless the main thread has 379*15ab8c86SJohn Marino // told us to exit. Signal is needed for the case 380*15ab8c86SJohn Marino // where the main thread is waiting for the threads to stop. 381*15ab8c86SJohn Marino mythread_sync(thr->mutex) { 382*15ab8c86SJohn Marino if (thr->state != THR_EXIT) { 383*15ab8c86SJohn Marino thr->state = THR_IDLE; 384*15ab8c86SJohn Marino mythread_cond_signal(&thr->cond); 385*15ab8c86SJohn Marino } 386*15ab8c86SJohn Marino } 387*15ab8c86SJohn Marino 388*15ab8c86SJohn Marino mythread_sync(thr->coder->mutex) { 389*15ab8c86SJohn Marino // Mark the output buffer as finished if 390*15ab8c86SJohn Marino // no errors occurred. 391*15ab8c86SJohn Marino thr->outbuf->finished = state == THR_FINISH; 392*15ab8c86SJohn Marino 393*15ab8c86SJohn Marino // Update the main progress info. 394*15ab8c86SJohn Marino thr->coder->progress_in 395*15ab8c86SJohn Marino += thr->outbuf->uncompressed_size; 396*15ab8c86SJohn Marino thr->coder->progress_out += thr->outbuf->size; 397*15ab8c86SJohn Marino thr->progress_in = 0; 398*15ab8c86SJohn Marino thr->progress_out = 0; 399*15ab8c86SJohn Marino 400*15ab8c86SJohn Marino // Return this thread to the stack of free threads. 401*15ab8c86SJohn Marino thr->next = thr->coder->threads_free; 402*15ab8c86SJohn Marino thr->coder->threads_free = thr; 403*15ab8c86SJohn Marino 404*15ab8c86SJohn Marino mythread_cond_signal(&thr->coder->cond); 405*15ab8c86SJohn Marino } 406*15ab8c86SJohn Marino } 407*15ab8c86SJohn Marino 408*15ab8c86SJohn Marino // Exiting, free the resources. 409*15ab8c86SJohn Marino mythread_mutex_destroy(&thr->mutex); 410*15ab8c86SJohn Marino mythread_cond_destroy(&thr->cond); 411*15ab8c86SJohn Marino 412*15ab8c86SJohn Marino lzma_next_end(&thr->block_encoder, thr->allocator); 413*15ab8c86SJohn Marino lzma_free(thr->in, thr->allocator); 414*15ab8c86SJohn Marino return MYTHREAD_RET_VALUE; 415*15ab8c86SJohn Marino } 416*15ab8c86SJohn Marino 417*15ab8c86SJohn Marino 418*15ab8c86SJohn Marino /// Make the threads stop but not exit. Optionally wait for them to stop. 419*15ab8c86SJohn Marino static void 420*15ab8c86SJohn Marino threads_stop(lzma_coder *coder, bool wait_for_threads) 421*15ab8c86SJohn Marino { 422*15ab8c86SJohn Marino // Tell the threads to stop. 423*15ab8c86SJohn Marino for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 424*15ab8c86SJohn Marino mythread_sync(coder->threads[i].mutex) { 425*15ab8c86SJohn Marino coder->threads[i].state = THR_STOP; 426*15ab8c86SJohn Marino mythread_cond_signal(&coder->threads[i].cond); 427*15ab8c86SJohn Marino } 428*15ab8c86SJohn Marino } 429*15ab8c86SJohn Marino 430*15ab8c86SJohn Marino if (!wait_for_threads) 431*15ab8c86SJohn Marino return; 432*15ab8c86SJohn Marino 433*15ab8c86SJohn Marino // Wait for the threads to settle in the idle state. 434*15ab8c86SJohn Marino for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 435*15ab8c86SJohn Marino mythread_sync(coder->threads[i].mutex) { 436*15ab8c86SJohn Marino while (coder->threads[i].state != THR_IDLE) 437*15ab8c86SJohn Marino mythread_cond_wait(&coder->threads[i].cond, 438*15ab8c86SJohn Marino &coder->threads[i].mutex); 439*15ab8c86SJohn Marino } 440*15ab8c86SJohn Marino } 441*15ab8c86SJohn Marino 442*15ab8c86SJohn Marino return; 443*15ab8c86SJohn Marino } 444*15ab8c86SJohn Marino 445*15ab8c86SJohn Marino 446*15ab8c86SJohn Marino /// Stop the threads and free the resources associated with them. 447*15ab8c86SJohn Marino /// Wait until the threads have exited. 448*15ab8c86SJohn Marino static void 449*15ab8c86SJohn Marino threads_end(lzma_coder *coder, const lzma_allocator *allocator) 450*15ab8c86SJohn Marino { 451*15ab8c86SJohn Marino for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 452*15ab8c86SJohn Marino mythread_sync(coder->threads[i].mutex) { 453*15ab8c86SJohn Marino coder->threads[i].state = THR_EXIT; 454*15ab8c86SJohn Marino mythread_cond_signal(&coder->threads[i].cond); 455*15ab8c86SJohn Marino } 456*15ab8c86SJohn Marino } 457*15ab8c86SJohn Marino 458*15ab8c86SJohn Marino for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 459*15ab8c86SJohn Marino int ret = mythread_join(coder->threads[i].thread_id); 460*15ab8c86SJohn Marino assert(ret == 0); 461*15ab8c86SJohn Marino (void)ret; 462*15ab8c86SJohn Marino } 463*15ab8c86SJohn Marino 464*15ab8c86SJohn Marino lzma_free(coder->threads, allocator); 465*15ab8c86SJohn Marino return; 466*15ab8c86SJohn Marino } 467*15ab8c86SJohn Marino 468*15ab8c86SJohn Marino 469*15ab8c86SJohn Marino /// Initialize a new worker_thread structure and create a new thread. 470*15ab8c86SJohn Marino static lzma_ret 471*15ab8c86SJohn Marino initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator) 472*15ab8c86SJohn Marino { 473*15ab8c86SJohn Marino worker_thread *thr = &coder->threads[coder->threads_initialized]; 474*15ab8c86SJohn Marino 475*15ab8c86SJohn Marino thr->in = lzma_alloc(coder->block_size, allocator); 476*15ab8c86SJohn Marino if (thr->in == NULL) 477*15ab8c86SJohn Marino return LZMA_MEM_ERROR; 478*15ab8c86SJohn Marino 479*15ab8c86SJohn Marino if (mythread_mutex_init(&thr->mutex)) 480*15ab8c86SJohn Marino goto error_mutex; 481*15ab8c86SJohn Marino 482*15ab8c86SJohn Marino if (mythread_cond_init(&thr->cond)) 483*15ab8c86SJohn Marino goto error_cond; 484*15ab8c86SJohn Marino 485*15ab8c86SJohn Marino thr->state = THR_IDLE; 486*15ab8c86SJohn Marino thr->allocator = allocator; 487*15ab8c86SJohn Marino thr->coder = coder; 488*15ab8c86SJohn Marino thr->progress_in = 0; 489*15ab8c86SJohn Marino thr->progress_out = 0; 490*15ab8c86SJohn Marino thr->block_encoder = LZMA_NEXT_CODER_INIT; 491*15ab8c86SJohn Marino 492*15ab8c86SJohn Marino if (mythread_create(&thr->thread_id, &worker_start, thr)) 493*15ab8c86SJohn Marino goto error_thread; 494*15ab8c86SJohn Marino 495*15ab8c86SJohn Marino ++coder->threads_initialized; 496*15ab8c86SJohn Marino coder->thr = thr; 497*15ab8c86SJohn Marino 498*15ab8c86SJohn Marino return LZMA_OK; 499*15ab8c86SJohn Marino 500*15ab8c86SJohn Marino error_thread: 501*15ab8c86SJohn Marino mythread_cond_destroy(&thr->cond); 502*15ab8c86SJohn Marino 503*15ab8c86SJohn Marino error_cond: 504*15ab8c86SJohn Marino mythread_mutex_destroy(&thr->mutex); 505*15ab8c86SJohn Marino 506*15ab8c86SJohn Marino error_mutex: 507*15ab8c86SJohn Marino lzma_free(thr->in, allocator); 508*15ab8c86SJohn Marino return LZMA_MEM_ERROR; 509*15ab8c86SJohn Marino } 510*15ab8c86SJohn Marino 511*15ab8c86SJohn Marino 512*15ab8c86SJohn Marino static lzma_ret 513*15ab8c86SJohn Marino get_thread(lzma_coder *coder, const lzma_allocator *allocator) 514*15ab8c86SJohn Marino { 515*15ab8c86SJohn Marino // If there are no free output subqueues, there is no 516*15ab8c86SJohn Marino // point to try getting a thread. 517*15ab8c86SJohn Marino if (!lzma_outq_has_buf(&coder->outq)) 518*15ab8c86SJohn Marino return LZMA_OK; 519*15ab8c86SJohn Marino 520*15ab8c86SJohn Marino // If there is a free structure on the stack, use it. 521*15ab8c86SJohn Marino mythread_sync(coder->mutex) { 522*15ab8c86SJohn Marino if (coder->threads_free != NULL) { 523*15ab8c86SJohn Marino coder->thr = coder->threads_free; 524*15ab8c86SJohn Marino coder->threads_free = coder->threads_free->next; 525*15ab8c86SJohn Marino } 526*15ab8c86SJohn Marino } 527*15ab8c86SJohn Marino 528*15ab8c86SJohn Marino if (coder->thr == NULL) { 529*15ab8c86SJohn Marino // If there are no uninitialized structures left, return. 530*15ab8c86SJohn Marino if (coder->threads_initialized == coder->threads_max) 531*15ab8c86SJohn Marino return LZMA_OK; 532*15ab8c86SJohn Marino 533*15ab8c86SJohn Marino // Initialize a new thread. 534*15ab8c86SJohn Marino return_if_error(initialize_new_thread(coder, allocator)); 535*15ab8c86SJohn Marino } 536*15ab8c86SJohn Marino 537*15ab8c86SJohn Marino // Reset the parts of the thread state that have to be done 538*15ab8c86SJohn Marino // in the main thread. 539*15ab8c86SJohn Marino mythread_sync(coder->thr->mutex) { 540*15ab8c86SJohn Marino coder->thr->state = THR_RUN; 541*15ab8c86SJohn Marino coder->thr->in_size = 0; 542*15ab8c86SJohn Marino coder->thr->outbuf = lzma_outq_get_buf(&coder->outq); 543*15ab8c86SJohn Marino mythread_cond_signal(&coder->thr->cond); 544*15ab8c86SJohn Marino } 545*15ab8c86SJohn Marino 546*15ab8c86SJohn Marino return LZMA_OK; 547*15ab8c86SJohn Marino } 548*15ab8c86SJohn Marino 549*15ab8c86SJohn Marino 550*15ab8c86SJohn Marino static lzma_ret 551*15ab8c86SJohn Marino stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator, 552*15ab8c86SJohn Marino const uint8_t *restrict in, size_t *restrict in_pos, 553*15ab8c86SJohn Marino size_t in_size, lzma_action action) 554*15ab8c86SJohn Marino { 555*15ab8c86SJohn Marino while (*in_pos < in_size 556*15ab8c86SJohn Marino || (coder->thr != NULL && action != LZMA_RUN)) { 557*15ab8c86SJohn Marino if (coder->thr == NULL) { 558*15ab8c86SJohn Marino // Get a new thread. 559*15ab8c86SJohn Marino const lzma_ret ret = get_thread(coder, allocator); 560*15ab8c86SJohn Marino if (coder->thr == NULL) 561*15ab8c86SJohn Marino return ret; 562*15ab8c86SJohn Marino } 563*15ab8c86SJohn Marino 564*15ab8c86SJohn Marino // Copy the input data to thread's buffer. 565*15ab8c86SJohn Marino size_t thr_in_size = coder->thr->in_size; 566*15ab8c86SJohn Marino lzma_bufcpy(in, in_pos, in_size, coder->thr->in, 567*15ab8c86SJohn Marino &thr_in_size, coder->block_size); 568*15ab8c86SJohn Marino 569*15ab8c86SJohn Marino // Tell the Block encoder to finish if 570*15ab8c86SJohn Marino // - it has got block_size bytes of input; or 571*15ab8c86SJohn Marino // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, 572*15ab8c86SJohn Marino // or LZMA_FULL_BARRIER was used. 573*15ab8c86SJohn Marino // 574*15ab8c86SJohn Marino // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 575*15ab8c86SJohn Marino const bool finish = thr_in_size == coder->block_size 576*15ab8c86SJohn Marino || (*in_pos == in_size && action != LZMA_RUN); 577*15ab8c86SJohn Marino 578*15ab8c86SJohn Marino bool block_error = false; 579*15ab8c86SJohn Marino 580*15ab8c86SJohn Marino mythread_sync(coder->thr->mutex) { 581*15ab8c86SJohn Marino if (coder->thr->state == THR_IDLE) { 582*15ab8c86SJohn Marino // Something has gone wrong with the Block 583*15ab8c86SJohn Marino // encoder. It has set coder->thread_error 584*15ab8c86SJohn Marino // which we will read a few lines later. 585*15ab8c86SJohn Marino block_error = true; 586*15ab8c86SJohn Marino } else { 587*15ab8c86SJohn Marino // Tell the Block encoder its new amount 588*15ab8c86SJohn Marino // of input and update the state if needed. 589*15ab8c86SJohn Marino coder->thr->in_size = thr_in_size; 590*15ab8c86SJohn Marino 591*15ab8c86SJohn Marino if (finish) 592*15ab8c86SJohn Marino coder->thr->state = THR_FINISH; 593*15ab8c86SJohn Marino 594*15ab8c86SJohn Marino mythread_cond_signal(&coder->thr->cond); 595*15ab8c86SJohn Marino } 596*15ab8c86SJohn Marino } 597*15ab8c86SJohn Marino 598*15ab8c86SJohn Marino if (block_error) { 599*15ab8c86SJohn Marino lzma_ret ret; 600*15ab8c86SJohn Marino 601*15ab8c86SJohn Marino mythread_sync(coder->mutex) { 602*15ab8c86SJohn Marino ret = coder->thread_error; 603*15ab8c86SJohn Marino } 604*15ab8c86SJohn Marino 605*15ab8c86SJohn Marino return ret; 606*15ab8c86SJohn Marino } 607*15ab8c86SJohn Marino 608*15ab8c86SJohn Marino if (finish) 609*15ab8c86SJohn Marino coder->thr = NULL; 610*15ab8c86SJohn Marino } 611*15ab8c86SJohn Marino 612*15ab8c86SJohn Marino return LZMA_OK; 613*15ab8c86SJohn Marino } 614*15ab8c86SJohn Marino 615*15ab8c86SJohn Marino 616*15ab8c86SJohn Marino /// Wait until more input can be consumed, more output can be read, or 617*15ab8c86SJohn Marino /// an optional timeout is reached. 618*15ab8c86SJohn Marino static bool 619*15ab8c86SJohn Marino wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs, 620*15ab8c86SJohn Marino bool *has_blocked, bool has_input) 621*15ab8c86SJohn Marino { 622*15ab8c86SJohn Marino if (coder->timeout != 0 && !*has_blocked) { 623*15ab8c86SJohn Marino // Every time when stream_encode_mt() is called via 624*15ab8c86SJohn Marino // lzma_code(), *has_blocked starts as false. We set it 625*15ab8c86SJohn Marino // to true here and calculate the absolute time when 626*15ab8c86SJohn Marino // we must return if there's nothing to do. 627*15ab8c86SJohn Marino // 628*15ab8c86SJohn Marino // The idea of *has_blocked is to avoid unneeded calls 629*15ab8c86SJohn Marino // to mythread_condtime_set(), which may do a syscall 630*15ab8c86SJohn Marino // depending on the operating system. 631*15ab8c86SJohn Marino *has_blocked = true; 632*15ab8c86SJohn Marino mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); 633*15ab8c86SJohn Marino } 634*15ab8c86SJohn Marino 635*15ab8c86SJohn Marino bool timed_out = false; 636*15ab8c86SJohn Marino 637*15ab8c86SJohn Marino mythread_sync(coder->mutex) { 638*15ab8c86SJohn Marino // There are four things that we wait. If one of them 639*15ab8c86SJohn Marino // becomes possible, we return. 640*15ab8c86SJohn Marino // - If there is input left, we need to get a free 641*15ab8c86SJohn Marino // worker thread and an output buffer for it. 642*15ab8c86SJohn Marino // - Data ready to be read from the output queue. 643*15ab8c86SJohn Marino // - A worker thread indicates an error. 644*15ab8c86SJohn Marino // - Time out occurs. 645*15ab8c86SJohn Marino while ((!has_input || coder->threads_free == NULL 646*15ab8c86SJohn Marino || !lzma_outq_has_buf(&coder->outq)) 647*15ab8c86SJohn Marino && !lzma_outq_is_readable(&coder->outq) 648*15ab8c86SJohn Marino && coder->thread_error == LZMA_OK 649*15ab8c86SJohn Marino && !timed_out) { 650*15ab8c86SJohn Marino if (coder->timeout != 0) 651*15ab8c86SJohn Marino timed_out = mythread_cond_timedwait( 652*15ab8c86SJohn Marino &coder->cond, &coder->mutex, 653*15ab8c86SJohn Marino wait_abs) != 0; 654*15ab8c86SJohn Marino else 655*15ab8c86SJohn Marino mythread_cond_wait(&coder->cond, 656*15ab8c86SJohn Marino &coder->mutex); 657*15ab8c86SJohn Marino } 658*15ab8c86SJohn Marino } 659*15ab8c86SJohn Marino 660*15ab8c86SJohn Marino return timed_out; 661*15ab8c86SJohn Marino } 662*15ab8c86SJohn Marino 663*15ab8c86SJohn Marino 664*15ab8c86SJohn Marino static lzma_ret 665*15ab8c86SJohn Marino stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator, 666*15ab8c86SJohn Marino const uint8_t *restrict in, size_t *restrict in_pos, 667*15ab8c86SJohn Marino size_t in_size, uint8_t *restrict out, 668*15ab8c86SJohn Marino size_t *restrict out_pos, size_t out_size, lzma_action action) 669*15ab8c86SJohn Marino { 670*15ab8c86SJohn Marino switch (coder->sequence) { 671*15ab8c86SJohn Marino case SEQ_STREAM_HEADER: 672*15ab8c86SJohn Marino lzma_bufcpy(coder->header, &coder->header_pos, 673*15ab8c86SJohn Marino sizeof(coder->header), 674*15ab8c86SJohn Marino out, out_pos, out_size); 675*15ab8c86SJohn Marino if (coder->header_pos < sizeof(coder->header)) 676*15ab8c86SJohn Marino return LZMA_OK; 677*15ab8c86SJohn Marino 678*15ab8c86SJohn Marino coder->header_pos = 0; 679*15ab8c86SJohn Marino coder->sequence = SEQ_BLOCK; 680*15ab8c86SJohn Marino 681*15ab8c86SJohn Marino // Fall through 682*15ab8c86SJohn Marino 683*15ab8c86SJohn Marino case SEQ_BLOCK: { 684*15ab8c86SJohn Marino // Initialized to silence warnings. 685*15ab8c86SJohn Marino lzma_vli unpadded_size = 0; 686*15ab8c86SJohn Marino lzma_vli uncompressed_size = 0; 687*15ab8c86SJohn Marino lzma_ret ret = LZMA_OK; 688*15ab8c86SJohn Marino 689*15ab8c86SJohn Marino // These are for wait_for_work(). 690*15ab8c86SJohn Marino bool has_blocked = false; 691*15ab8c86SJohn Marino mythread_condtime wait_abs; 692*15ab8c86SJohn Marino 693*15ab8c86SJohn Marino while (true) { 694*15ab8c86SJohn Marino mythread_sync(coder->mutex) { 695*15ab8c86SJohn Marino // Check for Block encoder errors. 696*15ab8c86SJohn Marino ret = coder->thread_error; 697*15ab8c86SJohn Marino if (ret != LZMA_OK) { 698*15ab8c86SJohn Marino assert(ret != LZMA_STREAM_END); 699*15ab8c86SJohn Marino break; 700*15ab8c86SJohn Marino } 701*15ab8c86SJohn Marino 702*15ab8c86SJohn Marino // Try to read compressed data to out[]. 703*15ab8c86SJohn Marino ret = lzma_outq_read(&coder->outq, 704*15ab8c86SJohn Marino out, out_pos, out_size, 705*15ab8c86SJohn Marino &unpadded_size, 706*15ab8c86SJohn Marino &uncompressed_size); 707*15ab8c86SJohn Marino } 708*15ab8c86SJohn Marino 709*15ab8c86SJohn Marino if (ret == LZMA_STREAM_END) { 710*15ab8c86SJohn Marino // End of Block. Add it to the Index. 711*15ab8c86SJohn Marino ret = lzma_index_append(coder->index, 712*15ab8c86SJohn Marino allocator, unpadded_size, 713*15ab8c86SJohn Marino uncompressed_size); 714*15ab8c86SJohn Marino 715*15ab8c86SJohn Marino // If we didn't fill the output buffer yet, 716*15ab8c86SJohn Marino // try to read more data. Maybe the next 717*15ab8c86SJohn Marino // outbuf has been finished already too. 718*15ab8c86SJohn Marino if (*out_pos < out_size) 719*15ab8c86SJohn Marino continue; 720*15ab8c86SJohn Marino } 721*15ab8c86SJohn Marino 722*15ab8c86SJohn Marino if (ret != LZMA_OK) { 723*15ab8c86SJohn Marino // coder->thread_error was set or 724*15ab8c86SJohn Marino // lzma_index_append() failed. 725*15ab8c86SJohn Marino threads_stop(coder, false); 726*15ab8c86SJohn Marino return ret; 727*15ab8c86SJohn Marino } 728*15ab8c86SJohn Marino 729*15ab8c86SJohn Marino // Try to give uncompressed data to a worker thread. 730*15ab8c86SJohn Marino ret = stream_encode_in(coder, allocator, 731*15ab8c86SJohn Marino in, in_pos, in_size, action); 732*15ab8c86SJohn Marino if (ret != LZMA_OK) { 733*15ab8c86SJohn Marino threads_stop(coder, false); 734*15ab8c86SJohn Marino return ret; 735*15ab8c86SJohn Marino } 736*15ab8c86SJohn Marino 737*15ab8c86SJohn Marino // See if we should wait or return. 738*15ab8c86SJohn Marino // 739*15ab8c86SJohn Marino // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 740*15ab8c86SJohn Marino if (*in_pos == in_size) { 741*15ab8c86SJohn Marino // LZMA_RUN: More data is probably coming 742*15ab8c86SJohn Marino // so return to let the caller fill the 743*15ab8c86SJohn Marino // input buffer. 744*15ab8c86SJohn Marino if (action == LZMA_RUN) 745*15ab8c86SJohn Marino return LZMA_OK; 746*15ab8c86SJohn Marino 747*15ab8c86SJohn Marino // LZMA_FULL_BARRIER: The same as with 748*15ab8c86SJohn Marino // LZMA_RUN but tell the caller that the 749*15ab8c86SJohn Marino // barrier was completed. 750*15ab8c86SJohn Marino if (action == LZMA_FULL_BARRIER) 751*15ab8c86SJohn Marino return LZMA_STREAM_END; 752*15ab8c86SJohn Marino 753*15ab8c86SJohn Marino // Finishing or flushing isn't completed until 754*15ab8c86SJohn Marino // all input data has been encoded and copied 755*15ab8c86SJohn Marino // to the output buffer. 756*15ab8c86SJohn Marino if (lzma_outq_is_empty(&coder->outq)) { 757*15ab8c86SJohn Marino // LZMA_FINISH: Continue to encode 758*15ab8c86SJohn Marino // the Index field. 759*15ab8c86SJohn Marino if (action == LZMA_FINISH) 760*15ab8c86SJohn Marino break; 761*15ab8c86SJohn Marino 762*15ab8c86SJohn Marino // LZMA_FULL_FLUSH: Return to tell 763*15ab8c86SJohn Marino // the caller that flushing was 764*15ab8c86SJohn Marino // completed. 765*15ab8c86SJohn Marino if (action == LZMA_FULL_FLUSH) 766*15ab8c86SJohn Marino return LZMA_STREAM_END; 767*15ab8c86SJohn Marino } 768*15ab8c86SJohn Marino } 769*15ab8c86SJohn Marino 770*15ab8c86SJohn Marino // Return if there is no output space left. 771*15ab8c86SJohn Marino // This check must be done after testing the input 772*15ab8c86SJohn Marino // buffer, because we might want to use a different 773*15ab8c86SJohn Marino // return code. 774*15ab8c86SJohn Marino if (*out_pos == out_size) 775*15ab8c86SJohn Marino return LZMA_OK; 776*15ab8c86SJohn Marino 777*15ab8c86SJohn Marino // Neither in nor out has been used completely. 778*15ab8c86SJohn Marino // Wait until there's something we can do. 779*15ab8c86SJohn Marino if (wait_for_work(coder, &wait_abs, &has_blocked, 780*15ab8c86SJohn Marino *in_pos < in_size)) 781*15ab8c86SJohn Marino return LZMA_TIMED_OUT; 782*15ab8c86SJohn Marino } 783*15ab8c86SJohn Marino 784*15ab8c86SJohn Marino // All Blocks have been encoded and the threads have stopped. 785*15ab8c86SJohn Marino // Prepare to encode the Index field. 786*15ab8c86SJohn Marino return_if_error(lzma_index_encoder_init( 787*15ab8c86SJohn Marino &coder->index_encoder, allocator, 788*15ab8c86SJohn Marino coder->index)); 789*15ab8c86SJohn Marino coder->sequence = SEQ_INDEX; 790*15ab8c86SJohn Marino 791*15ab8c86SJohn Marino // Update the progress info to take the Index and 792*15ab8c86SJohn Marino // Stream Footer into account. Those are very fast to encode 793*15ab8c86SJohn Marino // so in terms of progress information they can be thought 794*15ab8c86SJohn Marino // to be ready to be copied out. 795*15ab8c86SJohn Marino coder->progress_out += lzma_index_size(coder->index) 796*15ab8c86SJohn Marino + LZMA_STREAM_HEADER_SIZE; 797*15ab8c86SJohn Marino } 798*15ab8c86SJohn Marino 799*15ab8c86SJohn Marino // Fall through 800*15ab8c86SJohn Marino 801*15ab8c86SJohn Marino case SEQ_INDEX: { 802*15ab8c86SJohn Marino // Call the Index encoder. It doesn't take any input, so 803*15ab8c86SJohn Marino // those pointers can be NULL. 804*15ab8c86SJohn Marino const lzma_ret ret = coder->index_encoder.code( 805*15ab8c86SJohn Marino coder->index_encoder.coder, allocator, 806*15ab8c86SJohn Marino NULL, NULL, 0, 807*15ab8c86SJohn Marino out, out_pos, out_size, LZMA_RUN); 808*15ab8c86SJohn Marino if (ret != LZMA_STREAM_END) 809*15ab8c86SJohn Marino return ret; 810*15ab8c86SJohn Marino 811*15ab8c86SJohn Marino // Encode the Stream Footer into coder->buffer. 812*15ab8c86SJohn Marino coder->stream_flags.backward_size 813*15ab8c86SJohn Marino = lzma_index_size(coder->index); 814*15ab8c86SJohn Marino if (lzma_stream_footer_encode(&coder->stream_flags, 815*15ab8c86SJohn Marino coder->header) != LZMA_OK) 816*15ab8c86SJohn Marino return LZMA_PROG_ERROR; 817*15ab8c86SJohn Marino 818*15ab8c86SJohn Marino coder->sequence = SEQ_STREAM_FOOTER; 819*15ab8c86SJohn Marino } 820*15ab8c86SJohn Marino 821*15ab8c86SJohn Marino // Fall through 822*15ab8c86SJohn Marino 823*15ab8c86SJohn Marino case SEQ_STREAM_FOOTER: 824*15ab8c86SJohn Marino lzma_bufcpy(coder->header, &coder->header_pos, 825*15ab8c86SJohn Marino sizeof(coder->header), 826*15ab8c86SJohn Marino out, out_pos, out_size); 827*15ab8c86SJohn Marino return coder->header_pos < sizeof(coder->header) 828*15ab8c86SJohn Marino ? LZMA_OK : LZMA_STREAM_END; 829*15ab8c86SJohn Marino } 830*15ab8c86SJohn Marino 831*15ab8c86SJohn Marino assert(0); 832*15ab8c86SJohn Marino return LZMA_PROG_ERROR; 833*15ab8c86SJohn Marino } 834*15ab8c86SJohn Marino 835*15ab8c86SJohn Marino 836*15ab8c86SJohn Marino static void 837*15ab8c86SJohn Marino stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator) 838*15ab8c86SJohn Marino { 839*15ab8c86SJohn Marino // Threads must be killed before the output queue can be freed. 840*15ab8c86SJohn Marino threads_end(coder, allocator); 841*15ab8c86SJohn Marino lzma_outq_end(&coder->outq, allocator); 842*15ab8c86SJohn Marino 843*15ab8c86SJohn Marino for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) 844*15ab8c86SJohn Marino lzma_free(coder->filters[i].options, allocator); 845*15ab8c86SJohn Marino 846*15ab8c86SJohn Marino lzma_next_end(&coder->index_encoder, allocator); 847*15ab8c86SJohn Marino lzma_index_end(coder->index, allocator); 848*15ab8c86SJohn Marino 849*15ab8c86SJohn Marino mythread_cond_destroy(&coder->cond); 850*15ab8c86SJohn Marino mythread_mutex_destroy(&coder->mutex); 851*15ab8c86SJohn Marino 852*15ab8c86SJohn Marino lzma_free(coder, allocator); 853*15ab8c86SJohn Marino return; 854*15ab8c86SJohn Marino } 855*15ab8c86SJohn Marino 856*15ab8c86SJohn Marino 857*15ab8c86SJohn Marino /// Options handling for lzma_stream_encoder_mt_init() and 858*15ab8c86SJohn Marino /// lzma_stream_encoder_mt_memusage() 859*15ab8c86SJohn Marino static lzma_ret 860*15ab8c86SJohn Marino get_options(const lzma_mt *options, lzma_options_easy *opt_easy, 861*15ab8c86SJohn Marino const lzma_filter **filters, uint64_t *block_size, 862*15ab8c86SJohn Marino uint64_t *outbuf_size_max) 863*15ab8c86SJohn Marino { 864*15ab8c86SJohn Marino // Validate some of the options. 865*15ab8c86SJohn Marino if (options == NULL) 866*15ab8c86SJohn Marino return LZMA_PROG_ERROR; 867*15ab8c86SJohn Marino 868*15ab8c86SJohn Marino if (options->flags != 0 || options->threads == 0 869*15ab8c86SJohn Marino || options->threads > LZMA_THREADS_MAX) 870*15ab8c86SJohn Marino return LZMA_OPTIONS_ERROR; 871*15ab8c86SJohn Marino 872*15ab8c86SJohn Marino if (options->filters != NULL) { 873*15ab8c86SJohn Marino // Filter chain was given, use it as is. 874*15ab8c86SJohn Marino *filters = options->filters; 875*15ab8c86SJohn Marino } else { 876*15ab8c86SJohn Marino // Use a preset. 877*15ab8c86SJohn Marino if (lzma_easy_preset(opt_easy, options->preset)) 878*15ab8c86SJohn Marino return LZMA_OPTIONS_ERROR; 879*15ab8c86SJohn Marino 880*15ab8c86SJohn Marino *filters = opt_easy->filters; 881*15ab8c86SJohn Marino } 882*15ab8c86SJohn Marino 883*15ab8c86SJohn Marino // Block size 884*15ab8c86SJohn Marino if (options->block_size > 0) { 885*15ab8c86SJohn Marino if (options->block_size > BLOCK_SIZE_MAX) 886*15ab8c86SJohn Marino return LZMA_OPTIONS_ERROR; 887*15ab8c86SJohn Marino 888*15ab8c86SJohn Marino *block_size = options->block_size; 889*15ab8c86SJohn Marino } else { 890*15ab8c86SJohn Marino // Determine the Block size from the filter chain. 891*15ab8c86SJohn Marino *block_size = lzma_mt_block_size(*filters); 892*15ab8c86SJohn Marino if (*block_size == 0) 893*15ab8c86SJohn Marino return LZMA_OPTIONS_ERROR; 894*15ab8c86SJohn Marino 895*15ab8c86SJohn Marino assert(*block_size <= BLOCK_SIZE_MAX); 896*15ab8c86SJohn Marino } 897*15ab8c86SJohn Marino 898*15ab8c86SJohn Marino // Calculate the maximum amount output that a single output buffer 899*15ab8c86SJohn Marino // may need to hold. This is the same as the maximum total size of 900*15ab8c86SJohn Marino // a Block. 901*15ab8c86SJohn Marino *outbuf_size_max = lzma_block_buffer_bound64(*block_size); 902*15ab8c86SJohn Marino if (*outbuf_size_max == 0) 903*15ab8c86SJohn Marino return LZMA_MEM_ERROR; 904*15ab8c86SJohn Marino 905*15ab8c86SJohn Marino return LZMA_OK; 906*15ab8c86SJohn Marino } 907*15ab8c86SJohn Marino 908*15ab8c86SJohn Marino 909*15ab8c86SJohn Marino static void 910*15ab8c86SJohn Marino get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out) 911*15ab8c86SJohn Marino { 912*15ab8c86SJohn Marino // Lock coder->mutex to prevent finishing threads from moving their 913*15ab8c86SJohn Marino // progress info from the worker_thread structure to lzma_coder. 914*15ab8c86SJohn Marino mythread_sync(coder->mutex) { 915*15ab8c86SJohn Marino *progress_in = coder->progress_in; 916*15ab8c86SJohn Marino *progress_out = coder->progress_out; 917*15ab8c86SJohn Marino 918*15ab8c86SJohn Marino for (size_t i = 0; i < coder->threads_initialized; ++i) { 919*15ab8c86SJohn Marino mythread_sync(coder->threads[i].mutex) { 920*15ab8c86SJohn Marino *progress_in += coder->threads[i].progress_in; 921*15ab8c86SJohn Marino *progress_out += coder->threads[i] 922*15ab8c86SJohn Marino .progress_out; 923*15ab8c86SJohn Marino } 924*15ab8c86SJohn Marino } 925*15ab8c86SJohn Marino } 926*15ab8c86SJohn Marino 927*15ab8c86SJohn Marino return; 928*15ab8c86SJohn Marino } 929*15ab8c86SJohn Marino 930*15ab8c86SJohn Marino 931*15ab8c86SJohn Marino static lzma_ret 932*15ab8c86SJohn Marino stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, 933*15ab8c86SJohn Marino const lzma_mt *options) 934*15ab8c86SJohn Marino { 935*15ab8c86SJohn Marino lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); 936*15ab8c86SJohn Marino 937*15ab8c86SJohn Marino // Get the filter chain. 938*15ab8c86SJohn Marino lzma_options_easy easy; 939*15ab8c86SJohn Marino const lzma_filter *filters; 940*15ab8c86SJohn Marino uint64_t block_size; 941*15ab8c86SJohn Marino uint64_t outbuf_size_max; 942*15ab8c86SJohn Marino return_if_error(get_options(options, &easy, &filters, 943*15ab8c86SJohn Marino &block_size, &outbuf_size_max)); 944*15ab8c86SJohn Marino 945*15ab8c86SJohn Marino #if SIZE_MAX < UINT64_MAX 946*15ab8c86SJohn Marino if (block_size > SIZE_MAX) 947*15ab8c86SJohn Marino return LZMA_MEM_ERROR; 948*15ab8c86SJohn Marino #endif 949*15ab8c86SJohn Marino 950*15ab8c86SJohn Marino // Validate the filter chain so that we can give an error in this 951*15ab8c86SJohn Marino // function instead of delaying it to the first call to lzma_code(). 952*15ab8c86SJohn Marino // The memory usage calculation verifies the filter chain as 953*15ab8c86SJohn Marino // a side effect so we take advatange of that. 954*15ab8c86SJohn Marino if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 955*15ab8c86SJohn Marino return LZMA_OPTIONS_ERROR; 956*15ab8c86SJohn Marino 957*15ab8c86SJohn Marino // Validate the Check ID. 958*15ab8c86SJohn Marino if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) 959*15ab8c86SJohn Marino return LZMA_PROG_ERROR; 960*15ab8c86SJohn Marino 961*15ab8c86SJohn Marino if (!lzma_check_is_supported(options->check)) 962*15ab8c86SJohn Marino return LZMA_UNSUPPORTED_CHECK; 963*15ab8c86SJohn Marino 964*15ab8c86SJohn Marino // Allocate and initialize the base structure if needed. 965*15ab8c86SJohn Marino if (next->coder == NULL) { 966*15ab8c86SJohn Marino next->coder = lzma_alloc(sizeof(lzma_coder), allocator); 967*15ab8c86SJohn Marino if (next->coder == NULL) 968*15ab8c86SJohn Marino return LZMA_MEM_ERROR; 969*15ab8c86SJohn Marino 970*15ab8c86SJohn Marino // For the mutex and condition variable initializations 971*15ab8c86SJohn Marino // the error handling has to be done here because 972*15ab8c86SJohn Marino // stream_encoder_mt_end() doesn't know if they have 973*15ab8c86SJohn Marino // already been initialized or not. 974*15ab8c86SJohn Marino if (mythread_mutex_init(&next->coder->mutex)) { 975*15ab8c86SJohn Marino lzma_free(next->coder, allocator); 976*15ab8c86SJohn Marino next->coder = NULL; 977*15ab8c86SJohn Marino return LZMA_MEM_ERROR; 978*15ab8c86SJohn Marino } 979*15ab8c86SJohn Marino 980*15ab8c86SJohn Marino if (mythread_cond_init(&next->coder->cond)) { 981*15ab8c86SJohn Marino mythread_mutex_destroy(&next->coder->mutex); 982*15ab8c86SJohn Marino lzma_free(next->coder, allocator); 983*15ab8c86SJohn Marino next->coder = NULL; 984*15ab8c86SJohn Marino return LZMA_MEM_ERROR; 985*15ab8c86SJohn Marino } 986*15ab8c86SJohn Marino 987*15ab8c86SJohn Marino next->code = &stream_encode_mt; 988*15ab8c86SJohn Marino next->end = &stream_encoder_mt_end; 989*15ab8c86SJohn Marino next->get_progress = &get_progress; 990*15ab8c86SJohn Marino // next->update = &stream_encoder_mt_update; 991*15ab8c86SJohn Marino 992*15ab8c86SJohn Marino next->coder->filters[0].id = LZMA_VLI_UNKNOWN; 993*15ab8c86SJohn Marino next->coder->index_encoder = LZMA_NEXT_CODER_INIT; 994*15ab8c86SJohn Marino next->coder->index = NULL; 995*15ab8c86SJohn Marino memzero(&next->coder->outq, sizeof(next->coder->outq)); 996*15ab8c86SJohn Marino next->coder->threads = NULL; 997*15ab8c86SJohn Marino next->coder->threads_max = 0; 998*15ab8c86SJohn Marino next->coder->threads_initialized = 0; 999*15ab8c86SJohn Marino } 1000*15ab8c86SJohn Marino 1001*15ab8c86SJohn Marino // Basic initializations 1002*15ab8c86SJohn Marino next->coder->sequence = SEQ_STREAM_HEADER; 1003*15ab8c86SJohn Marino next->coder->block_size = (size_t)(block_size); 1004*15ab8c86SJohn Marino next->coder->thread_error = LZMA_OK; 1005*15ab8c86SJohn Marino next->coder->thr = NULL; 1006*15ab8c86SJohn Marino 1007*15ab8c86SJohn Marino // Allocate the thread-specific base structures. 1008*15ab8c86SJohn Marino assert(options->threads > 0); 1009*15ab8c86SJohn Marino if (next->coder->threads_max != options->threads) { 1010*15ab8c86SJohn Marino threads_end(next->coder, allocator); 1011*15ab8c86SJohn Marino 1012*15ab8c86SJohn Marino next->coder->threads = NULL; 1013*15ab8c86SJohn Marino next->coder->threads_max = 0; 1014*15ab8c86SJohn Marino 1015*15ab8c86SJohn Marino next->coder->threads_initialized = 0; 1016*15ab8c86SJohn Marino next->coder->threads_free = NULL; 1017*15ab8c86SJohn Marino 1018*15ab8c86SJohn Marino next->coder->threads = lzma_alloc( 1019*15ab8c86SJohn Marino options->threads * sizeof(worker_thread), 1020*15ab8c86SJohn Marino allocator); 1021*15ab8c86SJohn Marino if (next->coder->threads == NULL) 1022*15ab8c86SJohn Marino return LZMA_MEM_ERROR; 1023*15ab8c86SJohn Marino 1024*15ab8c86SJohn Marino next->coder->threads_max = options->threads; 1025*15ab8c86SJohn Marino } else { 1026*15ab8c86SJohn Marino // Reuse the old structures and threads. Tell the running 1027*15ab8c86SJohn Marino // threads to stop and wait until they have stopped. 1028*15ab8c86SJohn Marino threads_stop(next->coder, true); 1029*15ab8c86SJohn Marino } 1030*15ab8c86SJohn Marino 1031*15ab8c86SJohn Marino // Output queue 1032*15ab8c86SJohn Marino return_if_error(lzma_outq_init(&next->coder->outq, allocator, 1033*15ab8c86SJohn Marino outbuf_size_max, options->threads)); 1034*15ab8c86SJohn Marino 1035*15ab8c86SJohn Marino // Timeout 1036*15ab8c86SJohn Marino next->coder->timeout = options->timeout; 1037*15ab8c86SJohn Marino 1038*15ab8c86SJohn Marino // Free the old filter chain and copy the new one. 1039*15ab8c86SJohn Marino for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) 1040*15ab8c86SJohn Marino lzma_free(next->coder->filters[i].options, allocator); 1041*15ab8c86SJohn Marino 1042*15ab8c86SJohn Marino return_if_error(lzma_filters_copy( 1043*15ab8c86SJohn Marino filters, next->coder->filters, allocator)); 1044*15ab8c86SJohn Marino 1045*15ab8c86SJohn Marino // Index 1046*15ab8c86SJohn Marino lzma_index_end(next->coder->index, allocator); 1047*15ab8c86SJohn Marino next->coder->index = lzma_index_init(allocator); 1048*15ab8c86SJohn Marino if (next->coder->index == NULL) 1049*15ab8c86SJohn Marino return LZMA_MEM_ERROR; 1050*15ab8c86SJohn Marino 1051*15ab8c86SJohn Marino // Stream Header 1052*15ab8c86SJohn Marino next->coder->stream_flags.version = 0; 1053*15ab8c86SJohn Marino next->coder->stream_flags.check = options->check; 1054*15ab8c86SJohn Marino return_if_error(lzma_stream_header_encode( 1055*15ab8c86SJohn Marino &next->coder->stream_flags, next->coder->header)); 1056*15ab8c86SJohn Marino 1057*15ab8c86SJohn Marino next->coder->header_pos = 0; 1058*15ab8c86SJohn Marino 1059*15ab8c86SJohn Marino // Progress info 1060*15ab8c86SJohn Marino next->coder->progress_in = 0; 1061*15ab8c86SJohn Marino next->coder->progress_out = LZMA_STREAM_HEADER_SIZE; 1062*15ab8c86SJohn Marino 1063*15ab8c86SJohn Marino return LZMA_OK; 1064*15ab8c86SJohn Marino } 1065*15ab8c86SJohn Marino 1066*15ab8c86SJohn Marino 1067*15ab8c86SJohn Marino extern LZMA_API(lzma_ret) 1068*15ab8c86SJohn Marino lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) 1069*15ab8c86SJohn Marino { 1070*15ab8c86SJohn Marino lzma_next_strm_init(stream_encoder_mt_init, strm, options); 1071*15ab8c86SJohn Marino 1072*15ab8c86SJohn Marino strm->internal->supported_actions[LZMA_RUN] = true; 1073*15ab8c86SJohn Marino // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; 1074*15ab8c86SJohn Marino strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; 1075*15ab8c86SJohn Marino strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; 1076*15ab8c86SJohn Marino strm->internal->supported_actions[LZMA_FINISH] = true; 1077*15ab8c86SJohn Marino 1078*15ab8c86SJohn Marino return LZMA_OK; 1079*15ab8c86SJohn Marino } 1080*15ab8c86SJohn Marino 1081*15ab8c86SJohn Marino 1082*15ab8c86SJohn Marino // This function name is a monster but it's consistent with the older 1083*15ab8c86SJohn Marino // monster names. :-( 31 chars is the max that C99 requires so in that 1084*15ab8c86SJohn Marino // sense it's not too long. ;-) 1085*15ab8c86SJohn Marino extern LZMA_API(uint64_t) 1086*15ab8c86SJohn Marino lzma_stream_encoder_mt_memusage(const lzma_mt *options) 1087*15ab8c86SJohn Marino { 1088*15ab8c86SJohn Marino lzma_options_easy easy; 1089*15ab8c86SJohn Marino const lzma_filter *filters; 1090*15ab8c86SJohn Marino uint64_t block_size; 1091*15ab8c86SJohn Marino uint64_t outbuf_size_max; 1092*15ab8c86SJohn Marino 1093*15ab8c86SJohn Marino if (get_options(options, &easy, &filters, &block_size, 1094*15ab8c86SJohn Marino &outbuf_size_max) != LZMA_OK) 1095*15ab8c86SJohn Marino return UINT64_MAX; 1096*15ab8c86SJohn Marino 1097*15ab8c86SJohn Marino // Memory usage of the input buffers 1098*15ab8c86SJohn Marino const uint64_t inbuf_memusage = options->threads * block_size; 1099*15ab8c86SJohn Marino 1100*15ab8c86SJohn Marino // Memory usage of the filter encoders 1101*15ab8c86SJohn Marino uint64_t filters_memusage = lzma_raw_encoder_memusage(filters); 1102*15ab8c86SJohn Marino if (filters_memusage == UINT64_MAX) 1103*15ab8c86SJohn Marino return UINT64_MAX; 1104*15ab8c86SJohn Marino 1105*15ab8c86SJohn Marino filters_memusage *= options->threads; 1106*15ab8c86SJohn Marino 1107*15ab8c86SJohn Marino // Memory usage of the output queue 1108*15ab8c86SJohn Marino const uint64_t outq_memusage = lzma_outq_memusage( 1109*15ab8c86SJohn Marino outbuf_size_max, options->threads); 1110*15ab8c86SJohn Marino if (outq_memusage == UINT64_MAX) 1111*15ab8c86SJohn Marino return UINT64_MAX; 1112*15ab8c86SJohn Marino 1113*15ab8c86SJohn Marino // Sum them with overflow checking. 1114*15ab8c86SJohn Marino uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_coder) 1115*15ab8c86SJohn Marino + options->threads * sizeof(worker_thread); 1116*15ab8c86SJohn Marino 1117*15ab8c86SJohn Marino if (UINT64_MAX - total_memusage < inbuf_memusage) 1118*15ab8c86SJohn Marino return UINT64_MAX; 1119*15ab8c86SJohn Marino 1120*15ab8c86SJohn Marino total_memusage += inbuf_memusage; 1121*15ab8c86SJohn Marino 1122*15ab8c86SJohn Marino if (UINT64_MAX - total_memusage < filters_memusage) 1123*15ab8c86SJohn Marino return UINT64_MAX; 1124*15ab8c86SJohn Marino 1125*15ab8c86SJohn Marino total_memusage += filters_memusage; 1126*15ab8c86SJohn Marino 1127*15ab8c86SJohn Marino if (UINT64_MAX - total_memusage < outq_memusage) 1128*15ab8c86SJohn Marino return UINT64_MAX; 1129*15ab8c86SJohn Marino 1130*15ab8c86SJohn Marino return total_memusage + outq_memusage; 1131*15ab8c86SJohn Marino } 1132