1*0a6a1f1dSLionel Sambuc ///////////////////////////////////////////////////////////////////////////////
2*0a6a1f1dSLionel Sambuc //
3*0a6a1f1dSLionel Sambuc /// \file stream_encoder_mt.c
4*0a6a1f1dSLionel Sambuc /// \brief Multithreaded .xz Stream encoder
5*0a6a1f1dSLionel Sambuc //
6*0a6a1f1dSLionel Sambuc // Author: Lasse Collin
7*0a6a1f1dSLionel Sambuc //
8*0a6a1f1dSLionel Sambuc // This file has been put into the public domain.
9*0a6a1f1dSLionel Sambuc // You can do whatever you want with this file.
10*0a6a1f1dSLionel Sambuc //
11*0a6a1f1dSLionel Sambuc ///////////////////////////////////////////////////////////////////////////////
12*0a6a1f1dSLionel Sambuc
13*0a6a1f1dSLionel Sambuc #include "filter_encoder.h"
14*0a6a1f1dSLionel Sambuc #include "easy_preset.h"
15*0a6a1f1dSLionel Sambuc #include "block_encoder.h"
16*0a6a1f1dSLionel Sambuc #include "block_buffer_encoder.h"
17*0a6a1f1dSLionel Sambuc #include "index_encoder.h"
18*0a6a1f1dSLionel Sambuc #include "outqueue.h"
19*0a6a1f1dSLionel Sambuc
20*0a6a1f1dSLionel Sambuc
21*0a6a1f1dSLionel Sambuc /// Maximum supported block size. This makes it simpler to prevent integer
22*0a6a1f1dSLionel Sambuc /// overflows if we are given unusually large block size.
23*0a6a1f1dSLionel Sambuc #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24*0a6a1f1dSLionel Sambuc
25*0a6a1f1dSLionel Sambuc
26*0a6a1f1dSLionel Sambuc typedef enum {
27*0a6a1f1dSLionel Sambuc /// Waiting for work.
28*0a6a1f1dSLionel Sambuc THR_IDLE,
29*0a6a1f1dSLionel Sambuc
30*0a6a1f1dSLionel Sambuc /// Encoding is in progress.
31*0a6a1f1dSLionel Sambuc THR_RUN,
32*0a6a1f1dSLionel Sambuc
33*0a6a1f1dSLionel Sambuc /// Encoding is in progress but no more input data will
34*0a6a1f1dSLionel Sambuc /// be read.
35*0a6a1f1dSLionel Sambuc THR_FINISH,
36*0a6a1f1dSLionel Sambuc
37*0a6a1f1dSLionel Sambuc /// The main thread wants the thread to stop whatever it was doing
38*0a6a1f1dSLionel Sambuc /// but not exit.
39*0a6a1f1dSLionel Sambuc THR_STOP,
40*0a6a1f1dSLionel Sambuc
41*0a6a1f1dSLionel Sambuc /// The main thread wants the thread to exit. We could use
42*0a6a1f1dSLionel Sambuc /// cancellation but since there's stopped anyway, this is lazier.
43*0a6a1f1dSLionel Sambuc THR_EXIT,
44*0a6a1f1dSLionel Sambuc
45*0a6a1f1dSLionel Sambuc } worker_state;
46*0a6a1f1dSLionel Sambuc
47*0a6a1f1dSLionel Sambuc
48*0a6a1f1dSLionel Sambuc typedef struct worker_thread_s worker_thread;
49*0a6a1f1dSLionel Sambuc struct worker_thread_s {
50*0a6a1f1dSLionel Sambuc worker_state state;
51*0a6a1f1dSLionel Sambuc
52*0a6a1f1dSLionel Sambuc /// Input buffer of coder->block_size bytes. The main thread will
53*0a6a1f1dSLionel Sambuc /// put new input into this and update in_size accordingly. Once
54*0a6a1f1dSLionel Sambuc /// no more input is coming, state will be set to THR_FINISH.
55*0a6a1f1dSLionel Sambuc uint8_t *in;
56*0a6a1f1dSLionel Sambuc
57*0a6a1f1dSLionel Sambuc /// Amount of data available in the input buffer. This is modified
58*0a6a1f1dSLionel Sambuc /// only by the main thread.
59*0a6a1f1dSLionel Sambuc size_t in_size;
60*0a6a1f1dSLionel Sambuc
61*0a6a1f1dSLionel Sambuc /// Output buffer for this thread. This is set by the main
62*0a6a1f1dSLionel Sambuc /// thread every time a new Block is started with this thread
63*0a6a1f1dSLionel Sambuc /// structure.
64*0a6a1f1dSLionel Sambuc lzma_outbuf *outbuf;
65*0a6a1f1dSLionel Sambuc
66*0a6a1f1dSLionel Sambuc /// Pointer to the main structure is needed when putting this
67*0a6a1f1dSLionel Sambuc /// thread back to the stack of free threads.
68*0a6a1f1dSLionel Sambuc lzma_coder *coder;
69*0a6a1f1dSLionel Sambuc
70*0a6a1f1dSLionel Sambuc /// The allocator is set by the main thread. Since a copy of the
71*0a6a1f1dSLionel Sambuc /// pointer is kept here, the application must not change the
72*0a6a1f1dSLionel Sambuc /// allocator before calling lzma_end().
73*0a6a1f1dSLionel Sambuc const lzma_allocator *allocator;
74*0a6a1f1dSLionel Sambuc
75*0a6a1f1dSLionel Sambuc /// Amount of uncompressed data that has already been compressed.
76*0a6a1f1dSLionel Sambuc uint64_t progress_in;
77*0a6a1f1dSLionel Sambuc
78*0a6a1f1dSLionel Sambuc /// Amount of compressed data that is ready.
79*0a6a1f1dSLionel Sambuc uint64_t progress_out;
80*0a6a1f1dSLionel Sambuc
81*0a6a1f1dSLionel Sambuc /// Block encoder
82*0a6a1f1dSLionel Sambuc lzma_next_coder block_encoder;
83*0a6a1f1dSLionel Sambuc
84*0a6a1f1dSLionel Sambuc /// Compression options for this Block
85*0a6a1f1dSLionel Sambuc lzma_block block_options;
86*0a6a1f1dSLionel Sambuc
87*0a6a1f1dSLionel Sambuc /// Next structure in the stack of free worker threads.
88*0a6a1f1dSLionel Sambuc worker_thread *next;
89*0a6a1f1dSLionel Sambuc
90*0a6a1f1dSLionel Sambuc mythread_mutex mutex;
91*0a6a1f1dSLionel Sambuc mythread_cond cond;
92*0a6a1f1dSLionel Sambuc
93*0a6a1f1dSLionel Sambuc /// The ID of this thread is used to join the thread
94*0a6a1f1dSLionel Sambuc /// when it's not needed anymore.
95*0a6a1f1dSLionel Sambuc mythread thread_id;
96*0a6a1f1dSLionel Sambuc };
97*0a6a1f1dSLionel Sambuc
98*0a6a1f1dSLionel Sambuc
99*0a6a1f1dSLionel Sambuc struct lzma_coder_s {
100*0a6a1f1dSLionel Sambuc enum {
101*0a6a1f1dSLionel Sambuc SEQ_STREAM_HEADER,
102*0a6a1f1dSLionel Sambuc SEQ_BLOCK,
103*0a6a1f1dSLionel Sambuc SEQ_INDEX,
104*0a6a1f1dSLionel Sambuc SEQ_STREAM_FOOTER,
105*0a6a1f1dSLionel Sambuc } sequence;
106*0a6a1f1dSLionel Sambuc
107*0a6a1f1dSLionel Sambuc /// Start a new Block every block_size bytes of input unless
108*0a6a1f1dSLionel Sambuc /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
109*0a6a1f1dSLionel Sambuc size_t block_size;
110*0a6a1f1dSLionel Sambuc
111*0a6a1f1dSLionel Sambuc /// The filter chain currently in use
112*0a6a1f1dSLionel Sambuc lzma_filter filters[LZMA_FILTERS_MAX + 1];
113*0a6a1f1dSLionel Sambuc
114*0a6a1f1dSLionel Sambuc
115*0a6a1f1dSLionel Sambuc /// Index to hold sizes of the Blocks
116*0a6a1f1dSLionel Sambuc lzma_index *index;
117*0a6a1f1dSLionel Sambuc
118*0a6a1f1dSLionel Sambuc /// Index encoder
119*0a6a1f1dSLionel Sambuc lzma_next_coder index_encoder;
120*0a6a1f1dSLionel Sambuc
121*0a6a1f1dSLionel Sambuc
122*0a6a1f1dSLionel Sambuc /// Stream Flags for encoding the Stream Header and Stream Footer.
123*0a6a1f1dSLionel Sambuc lzma_stream_flags stream_flags;
124*0a6a1f1dSLionel Sambuc
125*0a6a1f1dSLionel Sambuc /// Buffer to hold Stream Header and Stream Footer.
126*0a6a1f1dSLionel Sambuc uint8_t header[LZMA_STREAM_HEADER_SIZE];
127*0a6a1f1dSLionel Sambuc
128*0a6a1f1dSLionel Sambuc /// Read position in header[]
129*0a6a1f1dSLionel Sambuc size_t header_pos;
130*0a6a1f1dSLionel Sambuc
131*0a6a1f1dSLionel Sambuc
132*0a6a1f1dSLionel Sambuc /// Output buffer queue for compressed data
133*0a6a1f1dSLionel Sambuc lzma_outq outq;
134*0a6a1f1dSLionel Sambuc
135*0a6a1f1dSLionel Sambuc
136*0a6a1f1dSLionel Sambuc /// Maximum wait time if cannot use all the input and cannot
137*0a6a1f1dSLionel Sambuc /// fill the output buffer. This is in milliseconds.
138*0a6a1f1dSLionel Sambuc uint32_t timeout;
139*0a6a1f1dSLionel Sambuc
140*0a6a1f1dSLionel Sambuc
141*0a6a1f1dSLionel Sambuc /// Error code from a worker thread
142*0a6a1f1dSLionel Sambuc lzma_ret thread_error;
143*0a6a1f1dSLionel Sambuc
144*0a6a1f1dSLionel Sambuc /// Array of allocated thread-specific structures
145*0a6a1f1dSLionel Sambuc worker_thread *threads;
146*0a6a1f1dSLionel Sambuc
147*0a6a1f1dSLionel Sambuc /// Number of structures in "threads" above. This is also the
148*0a6a1f1dSLionel Sambuc /// number of threads that will be created at maximum.
149*0a6a1f1dSLionel Sambuc uint32_t threads_max;
150*0a6a1f1dSLionel Sambuc
151*0a6a1f1dSLionel Sambuc /// Number of thread structures that have been initialized, and
152*0a6a1f1dSLionel Sambuc /// thus the number of worker threads actually created so far.
153*0a6a1f1dSLionel Sambuc uint32_t threads_initialized;
154*0a6a1f1dSLionel Sambuc
155*0a6a1f1dSLionel Sambuc /// Stack of free threads. When a thread finishes, it puts itself
156*0a6a1f1dSLionel Sambuc /// back into this stack. This starts as empty because threads
157*0a6a1f1dSLionel Sambuc /// are created only when actually needed.
158*0a6a1f1dSLionel Sambuc worker_thread *threads_free;
159*0a6a1f1dSLionel Sambuc
160*0a6a1f1dSLionel Sambuc /// The most recent worker thread to which the main thread writes
161*0a6a1f1dSLionel Sambuc /// the new input from the application.
162*0a6a1f1dSLionel Sambuc worker_thread *thr;
163*0a6a1f1dSLionel Sambuc
164*0a6a1f1dSLionel Sambuc
165*0a6a1f1dSLionel Sambuc /// Amount of uncompressed data in Blocks that have already
166*0a6a1f1dSLionel Sambuc /// been finished.
167*0a6a1f1dSLionel Sambuc uint64_t progress_in;
168*0a6a1f1dSLionel Sambuc
169*0a6a1f1dSLionel Sambuc /// Amount of compressed data in Stream Header + Blocks that
170*0a6a1f1dSLionel Sambuc /// have already been finished.
171*0a6a1f1dSLionel Sambuc uint64_t progress_out;
172*0a6a1f1dSLionel Sambuc
173*0a6a1f1dSLionel Sambuc
174*0a6a1f1dSLionel Sambuc mythread_mutex mutex;
175*0a6a1f1dSLionel Sambuc mythread_cond cond;
176*0a6a1f1dSLionel Sambuc };
177*0a6a1f1dSLionel Sambuc
178*0a6a1f1dSLionel Sambuc
179*0a6a1f1dSLionel Sambuc /// Tell the main thread that something has gone wrong.
180*0a6a1f1dSLionel Sambuc static void
worker_error(worker_thread * thr,lzma_ret ret)181*0a6a1f1dSLionel Sambuc worker_error(worker_thread *thr, lzma_ret ret)
182*0a6a1f1dSLionel Sambuc {
183*0a6a1f1dSLionel Sambuc assert(ret != LZMA_OK);
184*0a6a1f1dSLionel Sambuc assert(ret != LZMA_STREAM_END);
185*0a6a1f1dSLionel Sambuc
186*0a6a1f1dSLionel Sambuc mythread_sync(thr->coder->mutex) {
187*0a6a1f1dSLionel Sambuc if (thr->coder->thread_error == LZMA_OK)
188*0a6a1f1dSLionel Sambuc thr->coder->thread_error = ret;
189*0a6a1f1dSLionel Sambuc
190*0a6a1f1dSLionel Sambuc mythread_cond_signal(&thr->coder->cond);
191*0a6a1f1dSLionel Sambuc }
192*0a6a1f1dSLionel Sambuc
193*0a6a1f1dSLionel Sambuc return;
194*0a6a1f1dSLionel Sambuc }
195*0a6a1f1dSLionel Sambuc
196*0a6a1f1dSLionel Sambuc
197*0a6a1f1dSLionel Sambuc static worker_state
worker_encode(worker_thread * thr,worker_state state)198*0a6a1f1dSLionel Sambuc worker_encode(worker_thread *thr, worker_state state)
199*0a6a1f1dSLionel Sambuc {
200*0a6a1f1dSLionel Sambuc assert(thr->progress_in == 0);
201*0a6a1f1dSLionel Sambuc assert(thr->progress_out == 0);
202*0a6a1f1dSLionel Sambuc
203*0a6a1f1dSLionel Sambuc // Set the Block options.
204*0a6a1f1dSLionel Sambuc thr->block_options = (lzma_block){
205*0a6a1f1dSLionel Sambuc .version = 0,
206*0a6a1f1dSLionel Sambuc .check = thr->coder->stream_flags.check,
207*0a6a1f1dSLionel Sambuc .compressed_size = thr->coder->outq.buf_size_max,
208*0a6a1f1dSLionel Sambuc .uncompressed_size = thr->coder->block_size,
209*0a6a1f1dSLionel Sambuc
210*0a6a1f1dSLionel Sambuc // TODO: To allow changing the filter chain, the filters
211*0a6a1f1dSLionel Sambuc // array must be copied to each worker_thread.
212*0a6a1f1dSLionel Sambuc .filters = thr->coder->filters,
213*0a6a1f1dSLionel Sambuc };
214*0a6a1f1dSLionel Sambuc
215*0a6a1f1dSLionel Sambuc // Calculate maximum size of the Block Header. This amount is
216*0a6a1f1dSLionel Sambuc // reserved in the beginning of the buffer so that Block Header
217*0a6a1f1dSLionel Sambuc // along with Compressed Size and Uncompressed Size can be
218*0a6a1f1dSLionel Sambuc // written there.
219*0a6a1f1dSLionel Sambuc lzma_ret ret = lzma_block_header_size(&thr->block_options);
220*0a6a1f1dSLionel Sambuc if (ret != LZMA_OK) {
221*0a6a1f1dSLionel Sambuc worker_error(thr, ret);
222*0a6a1f1dSLionel Sambuc return THR_STOP;
223*0a6a1f1dSLionel Sambuc }
224*0a6a1f1dSLionel Sambuc
225*0a6a1f1dSLionel Sambuc // Initialize the Block encoder.
226*0a6a1f1dSLionel Sambuc ret = lzma_block_encoder_init(&thr->block_encoder,
227*0a6a1f1dSLionel Sambuc thr->allocator, &thr->block_options);
228*0a6a1f1dSLionel Sambuc if (ret != LZMA_OK) {
229*0a6a1f1dSLionel Sambuc worker_error(thr, ret);
230*0a6a1f1dSLionel Sambuc return THR_STOP;
231*0a6a1f1dSLionel Sambuc }
232*0a6a1f1dSLionel Sambuc
233*0a6a1f1dSLionel Sambuc size_t in_pos = 0;
234*0a6a1f1dSLionel Sambuc size_t in_size = 0;
235*0a6a1f1dSLionel Sambuc
236*0a6a1f1dSLionel Sambuc thr->outbuf->size = thr->block_options.header_size;
237*0a6a1f1dSLionel Sambuc const size_t out_size = thr->coder->outq.buf_size_max;
238*0a6a1f1dSLionel Sambuc
239*0a6a1f1dSLionel Sambuc do {
240*0a6a1f1dSLionel Sambuc mythread_sync(thr->mutex) {
241*0a6a1f1dSLionel Sambuc // Store in_pos and out_pos into *thr so that
242*0a6a1f1dSLionel Sambuc // an application may read them via
243*0a6a1f1dSLionel Sambuc // lzma_get_progress() to get progress information.
244*0a6a1f1dSLionel Sambuc //
245*0a6a1f1dSLionel Sambuc // NOTE: These aren't updated when the encoding
246*0a6a1f1dSLionel Sambuc // finishes. Instead, the final values are taken
247*0a6a1f1dSLionel Sambuc // later from thr->outbuf.
248*0a6a1f1dSLionel Sambuc thr->progress_in = in_pos;
249*0a6a1f1dSLionel Sambuc thr->progress_out = thr->outbuf->size;
250*0a6a1f1dSLionel Sambuc
251*0a6a1f1dSLionel Sambuc while (in_size == thr->in_size
252*0a6a1f1dSLionel Sambuc && thr->state == THR_RUN)
253*0a6a1f1dSLionel Sambuc mythread_cond_wait(&thr->cond, &thr->mutex);
254*0a6a1f1dSLionel Sambuc
255*0a6a1f1dSLionel Sambuc state = thr->state;
256*0a6a1f1dSLionel Sambuc in_size = thr->in_size;
257*0a6a1f1dSLionel Sambuc }
258*0a6a1f1dSLionel Sambuc
259*0a6a1f1dSLionel Sambuc // Return if we were asked to stop or exit.
260*0a6a1f1dSLionel Sambuc if (state >= THR_STOP)
261*0a6a1f1dSLionel Sambuc return state;
262*0a6a1f1dSLionel Sambuc
263*0a6a1f1dSLionel Sambuc lzma_action action = state == THR_FINISH
264*0a6a1f1dSLionel Sambuc ? LZMA_FINISH : LZMA_RUN;
265*0a6a1f1dSLionel Sambuc
266*0a6a1f1dSLionel Sambuc // Limit the amount of input given to the Block encoder
267*0a6a1f1dSLionel Sambuc // at once. This way this thread can react fairly quickly
268*0a6a1f1dSLionel Sambuc // if the main thread wants us to stop or exit.
269*0a6a1f1dSLionel Sambuc static const size_t in_chunk_max = 16384;
270*0a6a1f1dSLionel Sambuc size_t in_limit = in_size;
271*0a6a1f1dSLionel Sambuc if (in_size - in_pos > in_chunk_max) {
272*0a6a1f1dSLionel Sambuc in_limit = in_pos + in_chunk_max;
273*0a6a1f1dSLionel Sambuc action = LZMA_RUN;
274*0a6a1f1dSLionel Sambuc }
275*0a6a1f1dSLionel Sambuc
276*0a6a1f1dSLionel Sambuc ret = thr->block_encoder.code(
277*0a6a1f1dSLionel Sambuc thr->block_encoder.coder, thr->allocator,
278*0a6a1f1dSLionel Sambuc thr->in, &in_pos, in_limit, thr->outbuf->buf,
279*0a6a1f1dSLionel Sambuc &thr->outbuf->size, out_size, action);
280*0a6a1f1dSLionel Sambuc } while (ret == LZMA_OK && thr->outbuf->size < out_size);
281*0a6a1f1dSLionel Sambuc
282*0a6a1f1dSLionel Sambuc switch (ret) {
283*0a6a1f1dSLionel Sambuc case LZMA_STREAM_END:
284*0a6a1f1dSLionel Sambuc assert(state == THR_FINISH);
285*0a6a1f1dSLionel Sambuc
286*0a6a1f1dSLionel Sambuc // Encode the Block Header. By doing it after
287*0a6a1f1dSLionel Sambuc // the compression, we can store the Compressed Size
288*0a6a1f1dSLionel Sambuc // and Uncompressed Size fields.
289*0a6a1f1dSLionel Sambuc ret = lzma_block_header_encode(&thr->block_options,
290*0a6a1f1dSLionel Sambuc thr->outbuf->buf);
291*0a6a1f1dSLionel Sambuc if (ret != LZMA_OK) {
292*0a6a1f1dSLionel Sambuc worker_error(thr, ret);
293*0a6a1f1dSLionel Sambuc return THR_STOP;
294*0a6a1f1dSLionel Sambuc }
295*0a6a1f1dSLionel Sambuc
296*0a6a1f1dSLionel Sambuc break;
297*0a6a1f1dSLionel Sambuc
298*0a6a1f1dSLionel Sambuc case LZMA_OK:
299*0a6a1f1dSLionel Sambuc // The data was incompressible. Encode it using uncompressed
300*0a6a1f1dSLionel Sambuc // LZMA2 chunks.
301*0a6a1f1dSLionel Sambuc //
302*0a6a1f1dSLionel Sambuc // First wait that we have gotten all the input.
303*0a6a1f1dSLionel Sambuc mythread_sync(thr->mutex) {
304*0a6a1f1dSLionel Sambuc while (thr->state == THR_RUN)
305*0a6a1f1dSLionel Sambuc mythread_cond_wait(&thr->cond, &thr->mutex);
306*0a6a1f1dSLionel Sambuc
307*0a6a1f1dSLionel Sambuc state = thr->state;
308*0a6a1f1dSLionel Sambuc in_size = thr->in_size;
309*0a6a1f1dSLionel Sambuc }
310*0a6a1f1dSLionel Sambuc
311*0a6a1f1dSLionel Sambuc if (state >= THR_STOP)
312*0a6a1f1dSLionel Sambuc return state;
313*0a6a1f1dSLionel Sambuc
314*0a6a1f1dSLionel Sambuc // Do the encoding. This takes care of the Block Header too.
315*0a6a1f1dSLionel Sambuc thr->outbuf->size = 0;
316*0a6a1f1dSLionel Sambuc ret = lzma_block_uncomp_encode(&thr->block_options,
317*0a6a1f1dSLionel Sambuc thr->in, in_size, thr->outbuf->buf,
318*0a6a1f1dSLionel Sambuc &thr->outbuf->size, out_size);
319*0a6a1f1dSLionel Sambuc
320*0a6a1f1dSLionel Sambuc // It shouldn't fail.
321*0a6a1f1dSLionel Sambuc if (ret != LZMA_OK) {
322*0a6a1f1dSLionel Sambuc worker_error(thr, LZMA_PROG_ERROR);
323*0a6a1f1dSLionel Sambuc return THR_STOP;
324*0a6a1f1dSLionel Sambuc }
325*0a6a1f1dSLionel Sambuc
326*0a6a1f1dSLionel Sambuc break;
327*0a6a1f1dSLionel Sambuc
328*0a6a1f1dSLionel Sambuc default:
329*0a6a1f1dSLionel Sambuc worker_error(thr, ret);
330*0a6a1f1dSLionel Sambuc return THR_STOP;
331*0a6a1f1dSLionel Sambuc }
332*0a6a1f1dSLionel Sambuc
333*0a6a1f1dSLionel Sambuc // Set the size information that will be read by the main thread
334*0a6a1f1dSLionel Sambuc // to write the Index field.
335*0a6a1f1dSLionel Sambuc thr->outbuf->unpadded_size
336*0a6a1f1dSLionel Sambuc = lzma_block_unpadded_size(&thr->block_options);
337*0a6a1f1dSLionel Sambuc assert(thr->outbuf->unpadded_size != 0);
338*0a6a1f1dSLionel Sambuc thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
339*0a6a1f1dSLionel Sambuc
340*0a6a1f1dSLionel Sambuc return THR_FINISH;
341*0a6a1f1dSLionel Sambuc }
342*0a6a1f1dSLionel Sambuc
343*0a6a1f1dSLionel Sambuc
344*0a6a1f1dSLionel Sambuc static MYTHREAD_RET_TYPE
worker_start(void * thr_ptr)345*0a6a1f1dSLionel Sambuc worker_start(void *thr_ptr)
346*0a6a1f1dSLionel Sambuc {
347*0a6a1f1dSLionel Sambuc worker_thread *thr = thr_ptr;
348*0a6a1f1dSLionel Sambuc worker_state state = THR_IDLE; // Init to silence a warning
349*0a6a1f1dSLionel Sambuc
350*0a6a1f1dSLionel Sambuc while (true) {
351*0a6a1f1dSLionel Sambuc // Wait for work.
352*0a6a1f1dSLionel Sambuc mythread_sync(thr->mutex) {
353*0a6a1f1dSLionel Sambuc while (true) {
354*0a6a1f1dSLionel Sambuc // The thread is already idle so if we are
355*0a6a1f1dSLionel Sambuc // requested to stop, just set the state.
356*0a6a1f1dSLionel Sambuc if (thr->state == THR_STOP) {
357*0a6a1f1dSLionel Sambuc thr->state = THR_IDLE;
358*0a6a1f1dSLionel Sambuc mythread_cond_signal(&thr->cond);
359*0a6a1f1dSLionel Sambuc }
360*0a6a1f1dSLionel Sambuc
361*0a6a1f1dSLionel Sambuc state = thr->state;
362*0a6a1f1dSLionel Sambuc if (state != THR_IDLE)
363*0a6a1f1dSLionel Sambuc break;
364*0a6a1f1dSLionel Sambuc
365*0a6a1f1dSLionel Sambuc mythread_cond_wait(&thr->cond, &thr->mutex);
366*0a6a1f1dSLionel Sambuc }
367*0a6a1f1dSLionel Sambuc }
368*0a6a1f1dSLionel Sambuc
369*0a6a1f1dSLionel Sambuc assert(state != THR_IDLE);
370*0a6a1f1dSLionel Sambuc assert(state != THR_STOP);
371*0a6a1f1dSLionel Sambuc
372*0a6a1f1dSLionel Sambuc if (state <= THR_FINISH)
373*0a6a1f1dSLionel Sambuc state = worker_encode(thr, state);
374*0a6a1f1dSLionel Sambuc
375*0a6a1f1dSLionel Sambuc if (state == THR_EXIT)
376*0a6a1f1dSLionel Sambuc break;
377*0a6a1f1dSLionel Sambuc
378*0a6a1f1dSLionel Sambuc // Mark the thread as idle unless the main thread has
379*0a6a1f1dSLionel Sambuc // told us to exit. Signal is needed for the case
380*0a6a1f1dSLionel Sambuc // where the main thread is waiting for the threads to stop.
381*0a6a1f1dSLionel Sambuc mythread_sync(thr->mutex) {
382*0a6a1f1dSLionel Sambuc if (thr->state != THR_EXIT) {
383*0a6a1f1dSLionel Sambuc thr->state = THR_IDLE;
384*0a6a1f1dSLionel Sambuc mythread_cond_signal(&thr->cond);
385*0a6a1f1dSLionel Sambuc }
386*0a6a1f1dSLionel Sambuc }
387*0a6a1f1dSLionel Sambuc
388*0a6a1f1dSLionel Sambuc mythread_sync(thr->coder->mutex) {
389*0a6a1f1dSLionel Sambuc // Mark the output buffer as finished if
390*0a6a1f1dSLionel Sambuc // no errors occurred.
391*0a6a1f1dSLionel Sambuc thr->outbuf->finished = state == THR_FINISH;
392*0a6a1f1dSLionel Sambuc
393*0a6a1f1dSLionel Sambuc // Update the main progress info.
394*0a6a1f1dSLionel Sambuc thr->coder->progress_in
395*0a6a1f1dSLionel Sambuc += thr->outbuf->uncompressed_size;
396*0a6a1f1dSLionel Sambuc thr->coder->progress_out += thr->outbuf->size;
397*0a6a1f1dSLionel Sambuc thr->progress_in = 0;
398*0a6a1f1dSLionel Sambuc thr->progress_out = 0;
399*0a6a1f1dSLionel Sambuc
400*0a6a1f1dSLionel Sambuc // Return this thread to the stack of free threads.
401*0a6a1f1dSLionel Sambuc thr->next = thr->coder->threads_free;
402*0a6a1f1dSLionel Sambuc thr->coder->threads_free = thr;
403*0a6a1f1dSLionel Sambuc
404*0a6a1f1dSLionel Sambuc mythread_cond_signal(&thr->coder->cond);
405*0a6a1f1dSLionel Sambuc }
406*0a6a1f1dSLionel Sambuc }
407*0a6a1f1dSLionel Sambuc
408*0a6a1f1dSLionel Sambuc // Exiting, free the resources.
409*0a6a1f1dSLionel Sambuc mythread_mutex_destroy(&thr->mutex);
410*0a6a1f1dSLionel Sambuc mythread_cond_destroy(&thr->cond);
411*0a6a1f1dSLionel Sambuc
412*0a6a1f1dSLionel Sambuc lzma_next_end(&thr->block_encoder, thr->allocator);
413*0a6a1f1dSLionel Sambuc lzma_free(thr->in, thr->allocator);
414*0a6a1f1dSLionel Sambuc return MYTHREAD_RET_VALUE;
415*0a6a1f1dSLionel Sambuc }
416*0a6a1f1dSLionel Sambuc
417*0a6a1f1dSLionel Sambuc
418*0a6a1f1dSLionel Sambuc /// Make the threads stop but not exit. Optionally wait for them to stop.
419*0a6a1f1dSLionel Sambuc static void
threads_stop(lzma_coder * coder,bool wait_for_threads)420*0a6a1f1dSLionel Sambuc threads_stop(lzma_coder *coder, bool wait_for_threads)
421*0a6a1f1dSLionel Sambuc {
422*0a6a1f1dSLionel Sambuc // Tell the threads to stop.
423*0a6a1f1dSLionel Sambuc for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
424*0a6a1f1dSLionel Sambuc mythread_sync(coder->threads[i].mutex) {
425*0a6a1f1dSLionel Sambuc coder->threads[i].state = THR_STOP;
426*0a6a1f1dSLionel Sambuc mythread_cond_signal(&coder->threads[i].cond);
427*0a6a1f1dSLionel Sambuc }
428*0a6a1f1dSLionel Sambuc }
429*0a6a1f1dSLionel Sambuc
430*0a6a1f1dSLionel Sambuc if (!wait_for_threads)
431*0a6a1f1dSLionel Sambuc return;
432*0a6a1f1dSLionel Sambuc
433*0a6a1f1dSLionel Sambuc // Wait for the threads to settle in the idle state.
434*0a6a1f1dSLionel Sambuc for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
435*0a6a1f1dSLionel Sambuc mythread_sync(coder->threads[i].mutex) {
436*0a6a1f1dSLionel Sambuc while (coder->threads[i].state != THR_IDLE)
437*0a6a1f1dSLionel Sambuc mythread_cond_wait(&coder->threads[i].cond,
438*0a6a1f1dSLionel Sambuc &coder->threads[i].mutex);
439*0a6a1f1dSLionel Sambuc }
440*0a6a1f1dSLionel Sambuc }
441*0a6a1f1dSLionel Sambuc
442*0a6a1f1dSLionel Sambuc return;
443*0a6a1f1dSLionel Sambuc }
444*0a6a1f1dSLionel Sambuc
445*0a6a1f1dSLionel Sambuc
446*0a6a1f1dSLionel Sambuc /// Stop the threads and free the resources associated with them.
447*0a6a1f1dSLionel Sambuc /// Wait until the threads have exited.
448*0a6a1f1dSLionel Sambuc static void
threads_end(lzma_coder * coder,const lzma_allocator * allocator)449*0a6a1f1dSLionel Sambuc threads_end(lzma_coder *coder, const lzma_allocator *allocator)
450*0a6a1f1dSLionel Sambuc {
451*0a6a1f1dSLionel Sambuc for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
452*0a6a1f1dSLionel Sambuc mythread_sync(coder->threads[i].mutex) {
453*0a6a1f1dSLionel Sambuc coder->threads[i].state = THR_EXIT;
454*0a6a1f1dSLionel Sambuc mythread_cond_signal(&coder->threads[i].cond);
455*0a6a1f1dSLionel Sambuc }
456*0a6a1f1dSLionel Sambuc }
457*0a6a1f1dSLionel Sambuc
458*0a6a1f1dSLionel Sambuc for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
459*0a6a1f1dSLionel Sambuc int ret = mythread_join(coder->threads[i].thread_id);
460*0a6a1f1dSLionel Sambuc assert(ret == 0);
461*0a6a1f1dSLionel Sambuc (void)ret;
462*0a6a1f1dSLionel Sambuc }
463*0a6a1f1dSLionel Sambuc
464*0a6a1f1dSLionel Sambuc lzma_free(coder->threads, allocator);
465*0a6a1f1dSLionel Sambuc return;
466*0a6a1f1dSLionel Sambuc }
467*0a6a1f1dSLionel Sambuc
468*0a6a1f1dSLionel Sambuc
469*0a6a1f1dSLionel Sambuc /// Initialize a new worker_thread structure and create a new thread.
470*0a6a1f1dSLionel Sambuc static lzma_ret
initialize_new_thread(lzma_coder * coder,const lzma_allocator * allocator)471*0a6a1f1dSLionel Sambuc initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
472*0a6a1f1dSLionel Sambuc {
473*0a6a1f1dSLionel Sambuc worker_thread *thr = &coder->threads[coder->threads_initialized];
474*0a6a1f1dSLionel Sambuc
475*0a6a1f1dSLionel Sambuc thr->in = lzma_alloc(coder->block_size, allocator);
476*0a6a1f1dSLionel Sambuc if (thr->in == NULL)
477*0a6a1f1dSLionel Sambuc return LZMA_MEM_ERROR;
478*0a6a1f1dSLionel Sambuc
479*0a6a1f1dSLionel Sambuc if (mythread_mutex_init(&thr->mutex))
480*0a6a1f1dSLionel Sambuc goto error_mutex;
481*0a6a1f1dSLionel Sambuc
482*0a6a1f1dSLionel Sambuc if (mythread_cond_init(&thr->cond))
483*0a6a1f1dSLionel Sambuc goto error_cond;
484*0a6a1f1dSLionel Sambuc
485*0a6a1f1dSLionel Sambuc thr->state = THR_IDLE;
486*0a6a1f1dSLionel Sambuc thr->allocator = allocator;
487*0a6a1f1dSLionel Sambuc thr->coder = coder;
488*0a6a1f1dSLionel Sambuc thr->progress_in = 0;
489*0a6a1f1dSLionel Sambuc thr->progress_out = 0;
490*0a6a1f1dSLionel Sambuc thr->block_encoder = LZMA_NEXT_CODER_INIT;
491*0a6a1f1dSLionel Sambuc
492*0a6a1f1dSLionel Sambuc if (mythread_create(&thr->thread_id, &worker_start, thr))
493*0a6a1f1dSLionel Sambuc goto error_thread;
494*0a6a1f1dSLionel Sambuc
495*0a6a1f1dSLionel Sambuc ++coder->threads_initialized;
496*0a6a1f1dSLionel Sambuc coder->thr = thr;
497*0a6a1f1dSLionel Sambuc
498*0a6a1f1dSLionel Sambuc return LZMA_OK;
499*0a6a1f1dSLionel Sambuc
500*0a6a1f1dSLionel Sambuc error_thread:
501*0a6a1f1dSLionel Sambuc mythread_cond_destroy(&thr->cond);
502*0a6a1f1dSLionel Sambuc
503*0a6a1f1dSLionel Sambuc error_cond:
504*0a6a1f1dSLionel Sambuc mythread_mutex_destroy(&thr->mutex);
505*0a6a1f1dSLionel Sambuc
506*0a6a1f1dSLionel Sambuc error_mutex:
507*0a6a1f1dSLionel Sambuc lzma_free(thr->in, allocator);
508*0a6a1f1dSLionel Sambuc return LZMA_MEM_ERROR;
509*0a6a1f1dSLionel Sambuc }
510*0a6a1f1dSLionel Sambuc
511*0a6a1f1dSLionel Sambuc
512*0a6a1f1dSLionel Sambuc static lzma_ret
get_thread(lzma_coder * coder,const lzma_allocator * allocator)513*0a6a1f1dSLionel Sambuc get_thread(lzma_coder *coder, const lzma_allocator *allocator)
514*0a6a1f1dSLionel Sambuc {
515*0a6a1f1dSLionel Sambuc // If there are no free output subqueues, there is no
516*0a6a1f1dSLionel Sambuc // point to try getting a thread.
517*0a6a1f1dSLionel Sambuc if (!lzma_outq_has_buf(&coder->outq))
518*0a6a1f1dSLionel Sambuc return LZMA_OK;
519*0a6a1f1dSLionel Sambuc
520*0a6a1f1dSLionel Sambuc // If there is a free structure on the stack, use it.
521*0a6a1f1dSLionel Sambuc mythread_sync(coder->mutex) {
522*0a6a1f1dSLionel Sambuc if (coder->threads_free != NULL) {
523*0a6a1f1dSLionel Sambuc coder->thr = coder->threads_free;
524*0a6a1f1dSLionel Sambuc coder->threads_free = coder->threads_free->next;
525*0a6a1f1dSLionel Sambuc }
526*0a6a1f1dSLionel Sambuc }
527*0a6a1f1dSLionel Sambuc
528*0a6a1f1dSLionel Sambuc if (coder->thr == NULL) {
529*0a6a1f1dSLionel Sambuc // If there are no uninitialized structures left, return.
530*0a6a1f1dSLionel Sambuc if (coder->threads_initialized == coder->threads_max)
531*0a6a1f1dSLionel Sambuc return LZMA_OK;
532*0a6a1f1dSLionel Sambuc
533*0a6a1f1dSLionel Sambuc // Initialize a new thread.
534*0a6a1f1dSLionel Sambuc return_if_error(initialize_new_thread(coder, allocator));
535*0a6a1f1dSLionel Sambuc }
536*0a6a1f1dSLionel Sambuc
537*0a6a1f1dSLionel Sambuc // Reset the parts of the thread state that have to be done
538*0a6a1f1dSLionel Sambuc // in the main thread.
539*0a6a1f1dSLionel Sambuc mythread_sync(coder->thr->mutex) {
540*0a6a1f1dSLionel Sambuc coder->thr->state = THR_RUN;
541*0a6a1f1dSLionel Sambuc coder->thr->in_size = 0;
542*0a6a1f1dSLionel Sambuc coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
543*0a6a1f1dSLionel Sambuc mythread_cond_signal(&coder->thr->cond);
544*0a6a1f1dSLionel Sambuc }
545*0a6a1f1dSLionel Sambuc
546*0a6a1f1dSLionel Sambuc return LZMA_OK;
547*0a6a1f1dSLionel Sambuc }
548*0a6a1f1dSLionel Sambuc
549*0a6a1f1dSLionel Sambuc
550*0a6a1f1dSLionel Sambuc static lzma_ret
stream_encode_in(lzma_coder * coder,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,lzma_action action)551*0a6a1f1dSLionel Sambuc stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
552*0a6a1f1dSLionel Sambuc const uint8_t *restrict in, size_t *restrict in_pos,
553*0a6a1f1dSLionel Sambuc size_t in_size, lzma_action action)
554*0a6a1f1dSLionel Sambuc {
555*0a6a1f1dSLionel Sambuc while (*in_pos < in_size
556*0a6a1f1dSLionel Sambuc || (coder->thr != NULL && action != LZMA_RUN)) {
557*0a6a1f1dSLionel Sambuc if (coder->thr == NULL) {
558*0a6a1f1dSLionel Sambuc // Get a new thread.
559*0a6a1f1dSLionel Sambuc const lzma_ret ret = get_thread(coder, allocator);
560*0a6a1f1dSLionel Sambuc if (coder->thr == NULL)
561*0a6a1f1dSLionel Sambuc return ret;
562*0a6a1f1dSLionel Sambuc }
563*0a6a1f1dSLionel Sambuc
564*0a6a1f1dSLionel Sambuc // Copy the input data to thread's buffer.
565*0a6a1f1dSLionel Sambuc size_t thr_in_size = coder->thr->in_size;
566*0a6a1f1dSLionel Sambuc lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
567*0a6a1f1dSLionel Sambuc &thr_in_size, coder->block_size);
568*0a6a1f1dSLionel Sambuc
569*0a6a1f1dSLionel Sambuc // Tell the Block encoder to finish if
570*0a6a1f1dSLionel Sambuc // - it has got block_size bytes of input; or
571*0a6a1f1dSLionel Sambuc // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
572*0a6a1f1dSLionel Sambuc // or LZMA_FULL_BARRIER was used.
573*0a6a1f1dSLionel Sambuc //
574*0a6a1f1dSLionel Sambuc // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
575*0a6a1f1dSLionel Sambuc const bool finish = thr_in_size == coder->block_size
576*0a6a1f1dSLionel Sambuc || (*in_pos == in_size && action != LZMA_RUN);
577*0a6a1f1dSLionel Sambuc
578*0a6a1f1dSLionel Sambuc bool block_error = false;
579*0a6a1f1dSLionel Sambuc
580*0a6a1f1dSLionel Sambuc mythread_sync(coder->thr->mutex) {
581*0a6a1f1dSLionel Sambuc if (coder->thr->state == THR_IDLE) {
582*0a6a1f1dSLionel Sambuc // Something has gone wrong with the Block
583*0a6a1f1dSLionel Sambuc // encoder. It has set coder->thread_error
584*0a6a1f1dSLionel Sambuc // which we will read a few lines later.
585*0a6a1f1dSLionel Sambuc block_error = true;
586*0a6a1f1dSLionel Sambuc } else {
587*0a6a1f1dSLionel Sambuc // Tell the Block encoder its new amount
588*0a6a1f1dSLionel Sambuc // of input and update the state if needed.
589*0a6a1f1dSLionel Sambuc coder->thr->in_size = thr_in_size;
590*0a6a1f1dSLionel Sambuc
591*0a6a1f1dSLionel Sambuc if (finish)
592*0a6a1f1dSLionel Sambuc coder->thr->state = THR_FINISH;
593*0a6a1f1dSLionel Sambuc
594*0a6a1f1dSLionel Sambuc mythread_cond_signal(&coder->thr->cond);
595*0a6a1f1dSLionel Sambuc }
596*0a6a1f1dSLionel Sambuc }
597*0a6a1f1dSLionel Sambuc
598*0a6a1f1dSLionel Sambuc if (block_error) {
599*0a6a1f1dSLionel Sambuc lzma_ret ret;
600*0a6a1f1dSLionel Sambuc
601*0a6a1f1dSLionel Sambuc mythread_sync(coder->mutex) {
602*0a6a1f1dSLionel Sambuc ret = coder->thread_error;
603*0a6a1f1dSLionel Sambuc }
604*0a6a1f1dSLionel Sambuc
605*0a6a1f1dSLionel Sambuc return ret;
606*0a6a1f1dSLionel Sambuc }
607*0a6a1f1dSLionel Sambuc
608*0a6a1f1dSLionel Sambuc if (finish)
609*0a6a1f1dSLionel Sambuc coder->thr = NULL;
610*0a6a1f1dSLionel Sambuc }
611*0a6a1f1dSLionel Sambuc
612*0a6a1f1dSLionel Sambuc return LZMA_OK;
613*0a6a1f1dSLionel Sambuc }
614*0a6a1f1dSLionel Sambuc
615*0a6a1f1dSLionel Sambuc
616*0a6a1f1dSLionel Sambuc /// Wait until more input can be consumed, more output can be read, or
617*0a6a1f1dSLionel Sambuc /// an optional timeout is reached.
618*0a6a1f1dSLionel Sambuc static bool
wait_for_work(lzma_coder * coder,mythread_condtime * wait_abs,bool * has_blocked,bool has_input)619*0a6a1f1dSLionel Sambuc wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs,
620*0a6a1f1dSLionel Sambuc bool *has_blocked, bool has_input)
621*0a6a1f1dSLionel Sambuc {
622*0a6a1f1dSLionel Sambuc if (coder->timeout != 0 && !*has_blocked) {
623*0a6a1f1dSLionel Sambuc // Every time when stream_encode_mt() is called via
624*0a6a1f1dSLionel Sambuc // lzma_code(), *has_blocked starts as false. We set it
625*0a6a1f1dSLionel Sambuc // to true here and calculate the absolute time when
626*0a6a1f1dSLionel Sambuc // we must return if there's nothing to do.
627*0a6a1f1dSLionel Sambuc //
628*0a6a1f1dSLionel Sambuc // The idea of *has_blocked is to avoid unneeded calls
629*0a6a1f1dSLionel Sambuc // to mythread_condtime_set(), which may do a syscall
630*0a6a1f1dSLionel Sambuc // depending on the operating system.
631*0a6a1f1dSLionel Sambuc *has_blocked = true;
632*0a6a1f1dSLionel Sambuc mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
633*0a6a1f1dSLionel Sambuc }
634*0a6a1f1dSLionel Sambuc
635*0a6a1f1dSLionel Sambuc bool timed_out = false;
636*0a6a1f1dSLionel Sambuc
637*0a6a1f1dSLionel Sambuc mythread_sync(coder->mutex) {
638*0a6a1f1dSLionel Sambuc // There are four things that we wait. If one of them
639*0a6a1f1dSLionel Sambuc // becomes possible, we return.
640*0a6a1f1dSLionel Sambuc // - If there is input left, we need to get a free
641*0a6a1f1dSLionel Sambuc // worker thread and an output buffer for it.
642*0a6a1f1dSLionel Sambuc // - Data ready to be read from the output queue.
643*0a6a1f1dSLionel Sambuc // - A worker thread indicates an error.
644*0a6a1f1dSLionel Sambuc // - Time out occurs.
645*0a6a1f1dSLionel Sambuc while ((!has_input || coder->threads_free == NULL
646*0a6a1f1dSLionel Sambuc || !lzma_outq_has_buf(&coder->outq))
647*0a6a1f1dSLionel Sambuc && !lzma_outq_is_readable(&coder->outq)
648*0a6a1f1dSLionel Sambuc && coder->thread_error == LZMA_OK
649*0a6a1f1dSLionel Sambuc && !timed_out) {
650*0a6a1f1dSLionel Sambuc if (coder->timeout != 0)
651*0a6a1f1dSLionel Sambuc timed_out = mythread_cond_timedwait(
652*0a6a1f1dSLionel Sambuc &coder->cond, &coder->mutex,
653*0a6a1f1dSLionel Sambuc wait_abs) != 0;
654*0a6a1f1dSLionel Sambuc else
655*0a6a1f1dSLionel Sambuc mythread_cond_wait(&coder->cond,
656*0a6a1f1dSLionel Sambuc &coder->mutex);
657*0a6a1f1dSLionel Sambuc }
658*0a6a1f1dSLionel Sambuc }
659*0a6a1f1dSLionel Sambuc
660*0a6a1f1dSLionel Sambuc return timed_out;
661*0a6a1f1dSLionel Sambuc }
662*0a6a1f1dSLionel Sambuc
663*0a6a1f1dSLionel Sambuc
664*0a6a1f1dSLionel Sambuc static lzma_ret
stream_encode_mt(lzma_coder * coder,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,uint8_t * restrict out,size_t * restrict out_pos,size_t out_size,lzma_action action)665*0a6a1f1dSLionel Sambuc stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
666*0a6a1f1dSLionel Sambuc const uint8_t *restrict in, size_t *restrict in_pos,
667*0a6a1f1dSLionel Sambuc size_t in_size, uint8_t *restrict out,
668*0a6a1f1dSLionel Sambuc size_t *restrict out_pos, size_t out_size, lzma_action action)
669*0a6a1f1dSLionel Sambuc {
670*0a6a1f1dSLionel Sambuc switch (coder->sequence) {
671*0a6a1f1dSLionel Sambuc case SEQ_STREAM_HEADER:
672*0a6a1f1dSLionel Sambuc lzma_bufcpy(coder->header, &coder->header_pos,
673*0a6a1f1dSLionel Sambuc sizeof(coder->header),
674*0a6a1f1dSLionel Sambuc out, out_pos, out_size);
675*0a6a1f1dSLionel Sambuc if (coder->header_pos < sizeof(coder->header))
676*0a6a1f1dSLionel Sambuc return LZMA_OK;
677*0a6a1f1dSLionel Sambuc
678*0a6a1f1dSLionel Sambuc coder->header_pos = 0;
679*0a6a1f1dSLionel Sambuc coder->sequence = SEQ_BLOCK;
680*0a6a1f1dSLionel Sambuc
681*0a6a1f1dSLionel Sambuc // Fall through
682*0a6a1f1dSLionel Sambuc
683*0a6a1f1dSLionel Sambuc case SEQ_BLOCK: {
684*0a6a1f1dSLionel Sambuc // Initialized to silence warnings.
685*0a6a1f1dSLionel Sambuc lzma_vli unpadded_size = 0;
686*0a6a1f1dSLionel Sambuc lzma_vli uncompressed_size = 0;
687*0a6a1f1dSLionel Sambuc lzma_ret ret = LZMA_OK;
688*0a6a1f1dSLionel Sambuc
689*0a6a1f1dSLionel Sambuc // These are for wait_for_work().
690*0a6a1f1dSLionel Sambuc bool has_blocked = false;
691*0a6a1f1dSLionel Sambuc mythread_condtime wait_abs;
692*0a6a1f1dSLionel Sambuc
693*0a6a1f1dSLionel Sambuc while (true) {
694*0a6a1f1dSLionel Sambuc mythread_sync(coder->mutex) {
695*0a6a1f1dSLionel Sambuc // Check for Block encoder errors.
696*0a6a1f1dSLionel Sambuc ret = coder->thread_error;
697*0a6a1f1dSLionel Sambuc if (ret != LZMA_OK) {
698*0a6a1f1dSLionel Sambuc assert(ret != LZMA_STREAM_END);
699*0a6a1f1dSLionel Sambuc break;
700*0a6a1f1dSLionel Sambuc }
701*0a6a1f1dSLionel Sambuc
702*0a6a1f1dSLionel Sambuc // Try to read compressed data to out[].
703*0a6a1f1dSLionel Sambuc ret = lzma_outq_read(&coder->outq,
704*0a6a1f1dSLionel Sambuc out, out_pos, out_size,
705*0a6a1f1dSLionel Sambuc &unpadded_size,
706*0a6a1f1dSLionel Sambuc &uncompressed_size);
707*0a6a1f1dSLionel Sambuc }
708*0a6a1f1dSLionel Sambuc
709*0a6a1f1dSLionel Sambuc if (ret == LZMA_STREAM_END) {
710*0a6a1f1dSLionel Sambuc // End of Block. Add it to the Index.
711*0a6a1f1dSLionel Sambuc ret = lzma_index_append(coder->index,
712*0a6a1f1dSLionel Sambuc allocator, unpadded_size,
713*0a6a1f1dSLionel Sambuc uncompressed_size);
714*0a6a1f1dSLionel Sambuc
715*0a6a1f1dSLionel Sambuc // If we didn't fill the output buffer yet,
716*0a6a1f1dSLionel Sambuc // try to read more data. Maybe the next
717*0a6a1f1dSLionel Sambuc // outbuf has been finished already too.
718*0a6a1f1dSLionel Sambuc if (*out_pos < out_size)
719*0a6a1f1dSLionel Sambuc continue;
720*0a6a1f1dSLionel Sambuc }
721*0a6a1f1dSLionel Sambuc
722*0a6a1f1dSLionel Sambuc if (ret != LZMA_OK) {
723*0a6a1f1dSLionel Sambuc // coder->thread_error was set or
724*0a6a1f1dSLionel Sambuc // lzma_index_append() failed.
725*0a6a1f1dSLionel Sambuc threads_stop(coder, false);
726*0a6a1f1dSLionel Sambuc return ret;
727*0a6a1f1dSLionel Sambuc }
728*0a6a1f1dSLionel Sambuc
729*0a6a1f1dSLionel Sambuc // Try to give uncompressed data to a worker thread.
730*0a6a1f1dSLionel Sambuc ret = stream_encode_in(coder, allocator,
731*0a6a1f1dSLionel Sambuc in, in_pos, in_size, action);
732*0a6a1f1dSLionel Sambuc if (ret != LZMA_OK) {
733*0a6a1f1dSLionel Sambuc threads_stop(coder, false);
734*0a6a1f1dSLionel Sambuc return ret;
735*0a6a1f1dSLionel Sambuc }
736*0a6a1f1dSLionel Sambuc
737*0a6a1f1dSLionel Sambuc // See if we should wait or return.
738*0a6a1f1dSLionel Sambuc //
739*0a6a1f1dSLionel Sambuc // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
740*0a6a1f1dSLionel Sambuc if (*in_pos == in_size) {
741*0a6a1f1dSLionel Sambuc // LZMA_RUN: More data is probably coming
742*0a6a1f1dSLionel Sambuc // so return to let the caller fill the
743*0a6a1f1dSLionel Sambuc // input buffer.
744*0a6a1f1dSLionel Sambuc if (action == LZMA_RUN)
745*0a6a1f1dSLionel Sambuc return LZMA_OK;
746*0a6a1f1dSLionel Sambuc
747*0a6a1f1dSLionel Sambuc // LZMA_FULL_BARRIER: The same as with
748*0a6a1f1dSLionel Sambuc // LZMA_RUN but tell the caller that the
749*0a6a1f1dSLionel Sambuc // barrier was completed.
750*0a6a1f1dSLionel Sambuc if (action == LZMA_FULL_BARRIER)
751*0a6a1f1dSLionel Sambuc return LZMA_STREAM_END;
752*0a6a1f1dSLionel Sambuc
753*0a6a1f1dSLionel Sambuc // Finishing or flushing isn't completed until
754*0a6a1f1dSLionel Sambuc // all input data has been encoded and copied
755*0a6a1f1dSLionel Sambuc // to the output buffer.
756*0a6a1f1dSLionel Sambuc if (lzma_outq_is_empty(&coder->outq)) {
757*0a6a1f1dSLionel Sambuc // LZMA_FINISH: Continue to encode
758*0a6a1f1dSLionel Sambuc // the Index field.
759*0a6a1f1dSLionel Sambuc if (action == LZMA_FINISH)
760*0a6a1f1dSLionel Sambuc break;
761*0a6a1f1dSLionel Sambuc
762*0a6a1f1dSLionel Sambuc // LZMA_FULL_FLUSH: Return to tell
763*0a6a1f1dSLionel Sambuc // the caller that flushing was
764*0a6a1f1dSLionel Sambuc // completed.
765*0a6a1f1dSLionel Sambuc if (action == LZMA_FULL_FLUSH)
766*0a6a1f1dSLionel Sambuc return LZMA_STREAM_END;
767*0a6a1f1dSLionel Sambuc }
768*0a6a1f1dSLionel Sambuc }
769*0a6a1f1dSLionel Sambuc
770*0a6a1f1dSLionel Sambuc // Return if there is no output space left.
771*0a6a1f1dSLionel Sambuc // This check must be done after testing the input
772*0a6a1f1dSLionel Sambuc // buffer, because we might want to use a different
773*0a6a1f1dSLionel Sambuc // return code.
774*0a6a1f1dSLionel Sambuc if (*out_pos == out_size)
775*0a6a1f1dSLionel Sambuc return LZMA_OK;
776*0a6a1f1dSLionel Sambuc
777*0a6a1f1dSLionel Sambuc // Neither in nor out has been used completely.
778*0a6a1f1dSLionel Sambuc // Wait until there's something we can do.
779*0a6a1f1dSLionel Sambuc if (wait_for_work(coder, &wait_abs, &has_blocked,
780*0a6a1f1dSLionel Sambuc *in_pos < in_size))
781*0a6a1f1dSLionel Sambuc return LZMA_TIMED_OUT;
782*0a6a1f1dSLionel Sambuc }
783*0a6a1f1dSLionel Sambuc
784*0a6a1f1dSLionel Sambuc // All Blocks have been encoded and the threads have stopped.
785*0a6a1f1dSLionel Sambuc // Prepare to encode the Index field.
786*0a6a1f1dSLionel Sambuc return_if_error(lzma_index_encoder_init(
787*0a6a1f1dSLionel Sambuc &coder->index_encoder, allocator,
788*0a6a1f1dSLionel Sambuc coder->index));
789*0a6a1f1dSLionel Sambuc coder->sequence = SEQ_INDEX;
790*0a6a1f1dSLionel Sambuc
791*0a6a1f1dSLionel Sambuc // Update the progress info to take the Index and
792*0a6a1f1dSLionel Sambuc // Stream Footer into account. Those are very fast to encode
793*0a6a1f1dSLionel Sambuc // so in terms of progress information they can be thought
794*0a6a1f1dSLionel Sambuc // to be ready to be copied out.
795*0a6a1f1dSLionel Sambuc coder->progress_out += lzma_index_size(coder->index)
796*0a6a1f1dSLionel Sambuc + LZMA_STREAM_HEADER_SIZE;
797*0a6a1f1dSLionel Sambuc }
798*0a6a1f1dSLionel Sambuc
799*0a6a1f1dSLionel Sambuc // Fall through
800*0a6a1f1dSLionel Sambuc
801*0a6a1f1dSLionel Sambuc case SEQ_INDEX: {
802*0a6a1f1dSLionel Sambuc // Call the Index encoder. It doesn't take any input, so
803*0a6a1f1dSLionel Sambuc // those pointers can be NULL.
804*0a6a1f1dSLionel Sambuc const lzma_ret ret = coder->index_encoder.code(
805*0a6a1f1dSLionel Sambuc coder->index_encoder.coder, allocator,
806*0a6a1f1dSLionel Sambuc NULL, NULL, 0,
807*0a6a1f1dSLionel Sambuc out, out_pos, out_size, LZMA_RUN);
808*0a6a1f1dSLionel Sambuc if (ret != LZMA_STREAM_END)
809*0a6a1f1dSLionel Sambuc return ret;
810*0a6a1f1dSLionel Sambuc
811*0a6a1f1dSLionel Sambuc // Encode the Stream Footer into coder->buffer.
812*0a6a1f1dSLionel Sambuc coder->stream_flags.backward_size
813*0a6a1f1dSLionel Sambuc = lzma_index_size(coder->index);
814*0a6a1f1dSLionel Sambuc if (lzma_stream_footer_encode(&coder->stream_flags,
815*0a6a1f1dSLionel Sambuc coder->header) != LZMA_OK)
816*0a6a1f1dSLionel Sambuc return LZMA_PROG_ERROR;
817*0a6a1f1dSLionel Sambuc
818*0a6a1f1dSLionel Sambuc coder->sequence = SEQ_STREAM_FOOTER;
819*0a6a1f1dSLionel Sambuc }
820*0a6a1f1dSLionel Sambuc
821*0a6a1f1dSLionel Sambuc // Fall through
822*0a6a1f1dSLionel Sambuc
823*0a6a1f1dSLionel Sambuc case SEQ_STREAM_FOOTER:
824*0a6a1f1dSLionel Sambuc lzma_bufcpy(coder->header, &coder->header_pos,
825*0a6a1f1dSLionel Sambuc sizeof(coder->header),
826*0a6a1f1dSLionel Sambuc out, out_pos, out_size);
827*0a6a1f1dSLionel Sambuc return coder->header_pos < sizeof(coder->header)
828*0a6a1f1dSLionel Sambuc ? LZMA_OK : LZMA_STREAM_END;
829*0a6a1f1dSLionel Sambuc }
830*0a6a1f1dSLionel Sambuc
831*0a6a1f1dSLionel Sambuc assert(0);
832*0a6a1f1dSLionel Sambuc return LZMA_PROG_ERROR;
833*0a6a1f1dSLionel Sambuc }
834*0a6a1f1dSLionel Sambuc
835*0a6a1f1dSLionel Sambuc
836*0a6a1f1dSLionel Sambuc static void
stream_encoder_mt_end(lzma_coder * coder,const lzma_allocator * allocator)837*0a6a1f1dSLionel Sambuc stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator)
838*0a6a1f1dSLionel Sambuc {
839*0a6a1f1dSLionel Sambuc // Threads must be killed before the output queue can be freed.
840*0a6a1f1dSLionel Sambuc threads_end(coder, allocator);
841*0a6a1f1dSLionel Sambuc lzma_outq_end(&coder->outq, allocator);
842*0a6a1f1dSLionel Sambuc
843*0a6a1f1dSLionel Sambuc for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
844*0a6a1f1dSLionel Sambuc lzma_free(coder->filters[i].options, allocator);
845*0a6a1f1dSLionel Sambuc
846*0a6a1f1dSLionel Sambuc lzma_next_end(&coder->index_encoder, allocator);
847*0a6a1f1dSLionel Sambuc lzma_index_end(coder->index, allocator);
848*0a6a1f1dSLionel Sambuc
849*0a6a1f1dSLionel Sambuc mythread_cond_destroy(&coder->cond);
850*0a6a1f1dSLionel Sambuc mythread_mutex_destroy(&coder->mutex);
851*0a6a1f1dSLionel Sambuc
852*0a6a1f1dSLionel Sambuc lzma_free(coder, allocator);
853*0a6a1f1dSLionel Sambuc return;
854*0a6a1f1dSLionel Sambuc }
855*0a6a1f1dSLionel Sambuc
856*0a6a1f1dSLionel Sambuc
857*0a6a1f1dSLionel Sambuc /// Options handling for lzma_stream_encoder_mt_init() and
858*0a6a1f1dSLionel Sambuc /// lzma_stream_encoder_mt_memusage()
859*0a6a1f1dSLionel Sambuc static lzma_ret
get_options(const lzma_mt * options,lzma_options_easy * opt_easy,const lzma_filter ** filters,uint64_t * block_size,uint64_t * outbuf_size_max)860*0a6a1f1dSLionel Sambuc get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
861*0a6a1f1dSLionel Sambuc const lzma_filter **filters, uint64_t *block_size,
862*0a6a1f1dSLionel Sambuc uint64_t *outbuf_size_max)
863*0a6a1f1dSLionel Sambuc {
864*0a6a1f1dSLionel Sambuc // Validate some of the options.
865*0a6a1f1dSLionel Sambuc if (options == NULL)
866*0a6a1f1dSLionel Sambuc return LZMA_PROG_ERROR;
867*0a6a1f1dSLionel Sambuc
868*0a6a1f1dSLionel Sambuc if (options->flags != 0 || options->threads == 0
869*0a6a1f1dSLionel Sambuc || options->threads > LZMA_THREADS_MAX)
870*0a6a1f1dSLionel Sambuc return LZMA_OPTIONS_ERROR;
871*0a6a1f1dSLionel Sambuc
872*0a6a1f1dSLionel Sambuc if (options->filters != NULL) {
873*0a6a1f1dSLionel Sambuc // Filter chain was given, use it as is.
874*0a6a1f1dSLionel Sambuc *filters = options->filters;
875*0a6a1f1dSLionel Sambuc } else {
876*0a6a1f1dSLionel Sambuc // Use a preset.
877*0a6a1f1dSLionel Sambuc if (lzma_easy_preset(opt_easy, options->preset))
878*0a6a1f1dSLionel Sambuc return LZMA_OPTIONS_ERROR;
879*0a6a1f1dSLionel Sambuc
880*0a6a1f1dSLionel Sambuc *filters = opt_easy->filters;
881*0a6a1f1dSLionel Sambuc }
882*0a6a1f1dSLionel Sambuc
883*0a6a1f1dSLionel Sambuc // Block size
884*0a6a1f1dSLionel Sambuc if (options->block_size > 0) {
885*0a6a1f1dSLionel Sambuc if (options->block_size > BLOCK_SIZE_MAX)
886*0a6a1f1dSLionel Sambuc return LZMA_OPTIONS_ERROR;
887*0a6a1f1dSLionel Sambuc
888*0a6a1f1dSLionel Sambuc *block_size = options->block_size;
889*0a6a1f1dSLionel Sambuc } else {
890*0a6a1f1dSLionel Sambuc // Determine the Block size from the filter chain.
891*0a6a1f1dSLionel Sambuc *block_size = lzma_mt_block_size(*filters);
892*0a6a1f1dSLionel Sambuc if (*block_size == 0)
893*0a6a1f1dSLionel Sambuc return LZMA_OPTIONS_ERROR;
894*0a6a1f1dSLionel Sambuc
895*0a6a1f1dSLionel Sambuc assert(*block_size <= BLOCK_SIZE_MAX);
896*0a6a1f1dSLionel Sambuc }
897*0a6a1f1dSLionel Sambuc
898*0a6a1f1dSLionel Sambuc // Calculate the maximum amount output that a single output buffer
899*0a6a1f1dSLionel Sambuc // may need to hold. This is the same as the maximum total size of
900*0a6a1f1dSLionel Sambuc // a Block.
901*0a6a1f1dSLionel Sambuc *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
902*0a6a1f1dSLionel Sambuc if (*outbuf_size_max == 0)
903*0a6a1f1dSLionel Sambuc return LZMA_MEM_ERROR;
904*0a6a1f1dSLionel Sambuc
905*0a6a1f1dSLionel Sambuc return LZMA_OK;
906*0a6a1f1dSLionel Sambuc }
907*0a6a1f1dSLionel Sambuc
908*0a6a1f1dSLionel Sambuc
909*0a6a1f1dSLionel Sambuc static void
get_progress(lzma_coder * coder,uint64_t * progress_in,uint64_t * progress_out)910*0a6a1f1dSLionel Sambuc get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out)
911*0a6a1f1dSLionel Sambuc {
912*0a6a1f1dSLionel Sambuc // Lock coder->mutex to prevent finishing threads from moving their
913*0a6a1f1dSLionel Sambuc // progress info from the worker_thread structure to lzma_coder.
914*0a6a1f1dSLionel Sambuc mythread_sync(coder->mutex) {
915*0a6a1f1dSLionel Sambuc *progress_in = coder->progress_in;
916*0a6a1f1dSLionel Sambuc *progress_out = coder->progress_out;
917*0a6a1f1dSLionel Sambuc
918*0a6a1f1dSLionel Sambuc for (size_t i = 0; i < coder->threads_initialized; ++i) {
919*0a6a1f1dSLionel Sambuc mythread_sync(coder->threads[i].mutex) {
920*0a6a1f1dSLionel Sambuc *progress_in += coder->threads[i].progress_in;
921*0a6a1f1dSLionel Sambuc *progress_out += coder->threads[i]
922*0a6a1f1dSLionel Sambuc .progress_out;
923*0a6a1f1dSLionel Sambuc }
924*0a6a1f1dSLionel Sambuc }
925*0a6a1f1dSLionel Sambuc }
926*0a6a1f1dSLionel Sambuc
927*0a6a1f1dSLionel Sambuc return;
928*0a6a1f1dSLionel Sambuc }
929*0a6a1f1dSLionel Sambuc
930*0a6a1f1dSLionel Sambuc
931*0a6a1f1dSLionel Sambuc static lzma_ret
stream_encoder_mt_init(lzma_next_coder * next,const lzma_allocator * allocator,const lzma_mt * options)932*0a6a1f1dSLionel Sambuc stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
933*0a6a1f1dSLionel Sambuc const lzma_mt *options)
934*0a6a1f1dSLionel Sambuc {
935*0a6a1f1dSLionel Sambuc lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
936*0a6a1f1dSLionel Sambuc
937*0a6a1f1dSLionel Sambuc // Get the filter chain.
938*0a6a1f1dSLionel Sambuc lzma_options_easy easy;
939*0a6a1f1dSLionel Sambuc const lzma_filter *filters;
940*0a6a1f1dSLionel Sambuc uint64_t block_size;
941*0a6a1f1dSLionel Sambuc uint64_t outbuf_size_max;
942*0a6a1f1dSLionel Sambuc return_if_error(get_options(options, &easy, &filters,
943*0a6a1f1dSLionel Sambuc &block_size, &outbuf_size_max));
944*0a6a1f1dSLionel Sambuc
945*0a6a1f1dSLionel Sambuc #if SIZE_MAX < UINT64_MAX
946*0a6a1f1dSLionel Sambuc if (block_size > SIZE_MAX)
947*0a6a1f1dSLionel Sambuc return LZMA_MEM_ERROR;
948*0a6a1f1dSLionel Sambuc #endif
949*0a6a1f1dSLionel Sambuc
950*0a6a1f1dSLionel Sambuc // Validate the filter chain so that we can give an error in this
951*0a6a1f1dSLionel Sambuc // function instead of delaying it to the first call to lzma_code().
952*0a6a1f1dSLionel Sambuc // The memory usage calculation verifies the filter chain as
953*0a6a1f1dSLionel Sambuc // a side effect so we take advatange of that.
954*0a6a1f1dSLionel Sambuc if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
955*0a6a1f1dSLionel Sambuc return LZMA_OPTIONS_ERROR;
956*0a6a1f1dSLionel Sambuc
957*0a6a1f1dSLionel Sambuc // Validate the Check ID.
958*0a6a1f1dSLionel Sambuc if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
959*0a6a1f1dSLionel Sambuc return LZMA_PROG_ERROR;
960*0a6a1f1dSLionel Sambuc
961*0a6a1f1dSLionel Sambuc if (!lzma_check_is_supported(options->check))
962*0a6a1f1dSLionel Sambuc return LZMA_UNSUPPORTED_CHECK;
963*0a6a1f1dSLionel Sambuc
964*0a6a1f1dSLionel Sambuc // Allocate and initialize the base structure if needed.
965*0a6a1f1dSLionel Sambuc if (next->coder == NULL) {
966*0a6a1f1dSLionel Sambuc next->coder = lzma_alloc(sizeof(lzma_coder), allocator);
967*0a6a1f1dSLionel Sambuc if (next->coder == NULL)
968*0a6a1f1dSLionel Sambuc return LZMA_MEM_ERROR;
969*0a6a1f1dSLionel Sambuc
970*0a6a1f1dSLionel Sambuc // For the mutex and condition variable initializations
971*0a6a1f1dSLionel Sambuc // the error handling has to be done here because
972*0a6a1f1dSLionel Sambuc // stream_encoder_mt_end() doesn't know if they have
973*0a6a1f1dSLionel Sambuc // already been initialized or not.
974*0a6a1f1dSLionel Sambuc if (mythread_mutex_init(&next->coder->mutex)) {
975*0a6a1f1dSLionel Sambuc lzma_free(next->coder, allocator);
976*0a6a1f1dSLionel Sambuc next->coder = NULL;
977*0a6a1f1dSLionel Sambuc return LZMA_MEM_ERROR;
978*0a6a1f1dSLionel Sambuc }
979*0a6a1f1dSLionel Sambuc
980*0a6a1f1dSLionel Sambuc if (mythread_cond_init(&next->coder->cond)) {
981*0a6a1f1dSLionel Sambuc mythread_mutex_destroy(&next->coder->mutex);
982*0a6a1f1dSLionel Sambuc lzma_free(next->coder, allocator);
983*0a6a1f1dSLionel Sambuc next->coder = NULL;
984*0a6a1f1dSLionel Sambuc return LZMA_MEM_ERROR;
985*0a6a1f1dSLionel Sambuc }
986*0a6a1f1dSLionel Sambuc
987*0a6a1f1dSLionel Sambuc next->code = &stream_encode_mt;
988*0a6a1f1dSLionel Sambuc next->end = &stream_encoder_mt_end;
989*0a6a1f1dSLionel Sambuc next->get_progress = &get_progress;
990*0a6a1f1dSLionel Sambuc // next->update = &stream_encoder_mt_update;
991*0a6a1f1dSLionel Sambuc
992*0a6a1f1dSLionel Sambuc next->coder->filters[0].id = LZMA_VLI_UNKNOWN;
993*0a6a1f1dSLionel Sambuc next->coder->index_encoder = LZMA_NEXT_CODER_INIT;
994*0a6a1f1dSLionel Sambuc next->coder->index = NULL;
995*0a6a1f1dSLionel Sambuc memzero(&next->coder->outq, sizeof(next->coder->outq));
996*0a6a1f1dSLionel Sambuc next->coder->threads = NULL;
997*0a6a1f1dSLionel Sambuc next->coder->threads_max = 0;
998*0a6a1f1dSLionel Sambuc next->coder->threads_initialized = 0;
999*0a6a1f1dSLionel Sambuc }
1000*0a6a1f1dSLionel Sambuc
1001*0a6a1f1dSLionel Sambuc // Basic initializations
1002*0a6a1f1dSLionel Sambuc next->coder->sequence = SEQ_STREAM_HEADER;
1003*0a6a1f1dSLionel Sambuc next->coder->block_size = (size_t)(block_size);
1004*0a6a1f1dSLionel Sambuc next->coder->thread_error = LZMA_OK;
1005*0a6a1f1dSLionel Sambuc next->coder->thr = NULL;
1006*0a6a1f1dSLionel Sambuc
1007*0a6a1f1dSLionel Sambuc // Allocate the thread-specific base structures.
1008*0a6a1f1dSLionel Sambuc assert(options->threads > 0);
1009*0a6a1f1dSLionel Sambuc if (next->coder->threads_max != options->threads) {
1010*0a6a1f1dSLionel Sambuc threads_end(next->coder, allocator);
1011*0a6a1f1dSLionel Sambuc
1012*0a6a1f1dSLionel Sambuc next->coder->threads = NULL;
1013*0a6a1f1dSLionel Sambuc next->coder->threads_max = 0;
1014*0a6a1f1dSLionel Sambuc
1015*0a6a1f1dSLionel Sambuc next->coder->threads_initialized = 0;
1016*0a6a1f1dSLionel Sambuc next->coder->threads_free = NULL;
1017*0a6a1f1dSLionel Sambuc
1018*0a6a1f1dSLionel Sambuc next->coder->threads = lzma_alloc(
1019*0a6a1f1dSLionel Sambuc options->threads * sizeof(worker_thread),
1020*0a6a1f1dSLionel Sambuc allocator);
1021*0a6a1f1dSLionel Sambuc if (next->coder->threads == NULL)
1022*0a6a1f1dSLionel Sambuc return LZMA_MEM_ERROR;
1023*0a6a1f1dSLionel Sambuc
1024*0a6a1f1dSLionel Sambuc next->coder->threads_max = options->threads;
1025*0a6a1f1dSLionel Sambuc } else {
1026*0a6a1f1dSLionel Sambuc // Reuse the old structures and threads. Tell the running
1027*0a6a1f1dSLionel Sambuc // threads to stop and wait until they have stopped.
1028*0a6a1f1dSLionel Sambuc threads_stop(next->coder, true);
1029*0a6a1f1dSLionel Sambuc }
1030*0a6a1f1dSLionel Sambuc
1031*0a6a1f1dSLionel Sambuc // Output queue
1032*0a6a1f1dSLionel Sambuc return_if_error(lzma_outq_init(&next->coder->outq, allocator,
1033*0a6a1f1dSLionel Sambuc outbuf_size_max, options->threads));
1034*0a6a1f1dSLionel Sambuc
1035*0a6a1f1dSLionel Sambuc // Timeout
1036*0a6a1f1dSLionel Sambuc next->coder->timeout = options->timeout;
1037*0a6a1f1dSLionel Sambuc
1038*0a6a1f1dSLionel Sambuc // Free the old filter chain and copy the new one.
1039*0a6a1f1dSLionel Sambuc for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1040*0a6a1f1dSLionel Sambuc lzma_free(next->coder->filters[i].options, allocator);
1041*0a6a1f1dSLionel Sambuc
1042*0a6a1f1dSLionel Sambuc return_if_error(lzma_filters_copy(
1043*0a6a1f1dSLionel Sambuc filters, next->coder->filters, allocator));
1044*0a6a1f1dSLionel Sambuc
1045*0a6a1f1dSLionel Sambuc // Index
1046*0a6a1f1dSLionel Sambuc lzma_index_end(next->coder->index, allocator);
1047*0a6a1f1dSLionel Sambuc next->coder->index = lzma_index_init(allocator);
1048*0a6a1f1dSLionel Sambuc if (next->coder->index == NULL)
1049*0a6a1f1dSLionel Sambuc return LZMA_MEM_ERROR;
1050*0a6a1f1dSLionel Sambuc
1051*0a6a1f1dSLionel Sambuc // Stream Header
1052*0a6a1f1dSLionel Sambuc next->coder->stream_flags.version = 0;
1053*0a6a1f1dSLionel Sambuc next->coder->stream_flags.check = options->check;
1054*0a6a1f1dSLionel Sambuc return_if_error(lzma_stream_header_encode(
1055*0a6a1f1dSLionel Sambuc &next->coder->stream_flags, next->coder->header));
1056*0a6a1f1dSLionel Sambuc
1057*0a6a1f1dSLionel Sambuc next->coder->header_pos = 0;
1058*0a6a1f1dSLionel Sambuc
1059*0a6a1f1dSLionel Sambuc // Progress info
1060*0a6a1f1dSLionel Sambuc next->coder->progress_in = 0;
1061*0a6a1f1dSLionel Sambuc next->coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1062*0a6a1f1dSLionel Sambuc
1063*0a6a1f1dSLionel Sambuc return LZMA_OK;
1064*0a6a1f1dSLionel Sambuc }
1065*0a6a1f1dSLionel Sambuc
1066*0a6a1f1dSLionel Sambuc
1067*0a6a1f1dSLionel Sambuc extern LZMA_API(lzma_ret)
lzma_stream_encoder_mt(lzma_stream * strm,const lzma_mt * options)1068*0a6a1f1dSLionel Sambuc lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1069*0a6a1f1dSLionel Sambuc {
1070*0a6a1f1dSLionel Sambuc lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1071*0a6a1f1dSLionel Sambuc
1072*0a6a1f1dSLionel Sambuc strm->internal->supported_actions[LZMA_RUN] = true;
1073*0a6a1f1dSLionel Sambuc // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1074*0a6a1f1dSLionel Sambuc strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1075*0a6a1f1dSLionel Sambuc strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1076*0a6a1f1dSLionel Sambuc strm->internal->supported_actions[LZMA_FINISH] = true;
1077*0a6a1f1dSLionel Sambuc
1078*0a6a1f1dSLionel Sambuc return LZMA_OK;
1079*0a6a1f1dSLionel Sambuc }
1080*0a6a1f1dSLionel Sambuc
1081*0a6a1f1dSLionel Sambuc
1082*0a6a1f1dSLionel Sambuc // This function name is a monster but it's consistent with the older
1083*0a6a1f1dSLionel Sambuc // monster names. :-( 31 chars is the max that C99 requires so in that
1084*0a6a1f1dSLionel Sambuc // sense it's not too long. ;-)
1085*0a6a1f1dSLionel Sambuc extern LZMA_API(uint64_t)
lzma_stream_encoder_mt_memusage(const lzma_mt * options)1086*0a6a1f1dSLionel Sambuc lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1087*0a6a1f1dSLionel Sambuc {
1088*0a6a1f1dSLionel Sambuc lzma_options_easy easy;
1089*0a6a1f1dSLionel Sambuc const lzma_filter *filters;
1090*0a6a1f1dSLionel Sambuc uint64_t block_size;
1091*0a6a1f1dSLionel Sambuc uint64_t outbuf_size_max;
1092*0a6a1f1dSLionel Sambuc
1093*0a6a1f1dSLionel Sambuc if (get_options(options, &easy, &filters, &block_size,
1094*0a6a1f1dSLionel Sambuc &outbuf_size_max) != LZMA_OK)
1095*0a6a1f1dSLionel Sambuc return UINT64_MAX;
1096*0a6a1f1dSLionel Sambuc
1097*0a6a1f1dSLionel Sambuc // Memory usage of the input buffers
1098*0a6a1f1dSLionel Sambuc const uint64_t inbuf_memusage = options->threads * block_size;
1099*0a6a1f1dSLionel Sambuc
1100*0a6a1f1dSLionel Sambuc // Memory usage of the filter encoders
1101*0a6a1f1dSLionel Sambuc uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1102*0a6a1f1dSLionel Sambuc if (filters_memusage == UINT64_MAX)
1103*0a6a1f1dSLionel Sambuc return UINT64_MAX;
1104*0a6a1f1dSLionel Sambuc
1105*0a6a1f1dSLionel Sambuc filters_memusage *= options->threads;
1106*0a6a1f1dSLionel Sambuc
1107*0a6a1f1dSLionel Sambuc // Memory usage of the output queue
1108*0a6a1f1dSLionel Sambuc const uint64_t outq_memusage = lzma_outq_memusage(
1109*0a6a1f1dSLionel Sambuc outbuf_size_max, options->threads);
1110*0a6a1f1dSLionel Sambuc if (outq_memusage == UINT64_MAX)
1111*0a6a1f1dSLionel Sambuc return UINT64_MAX;
1112*0a6a1f1dSLionel Sambuc
1113*0a6a1f1dSLionel Sambuc // Sum them with overflow checking.
1114*0a6a1f1dSLionel Sambuc uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_coder)
1115*0a6a1f1dSLionel Sambuc + options->threads * sizeof(worker_thread);
1116*0a6a1f1dSLionel Sambuc
1117*0a6a1f1dSLionel Sambuc if (UINT64_MAX - total_memusage < inbuf_memusage)
1118*0a6a1f1dSLionel Sambuc return UINT64_MAX;
1119*0a6a1f1dSLionel Sambuc
1120*0a6a1f1dSLionel Sambuc total_memusage += inbuf_memusage;
1121*0a6a1f1dSLionel Sambuc
1122*0a6a1f1dSLionel Sambuc if (UINT64_MAX - total_memusage < filters_memusage)
1123*0a6a1f1dSLionel Sambuc return UINT64_MAX;
1124*0a6a1f1dSLionel Sambuc
1125*0a6a1f1dSLionel Sambuc total_memusage += filters_memusage;
1126*0a6a1f1dSLionel Sambuc
1127*0a6a1f1dSLionel Sambuc if (UINT64_MAX - total_memusage < outq_memusage)
1128*0a6a1f1dSLionel Sambuc return UINT64_MAX;
1129*0a6a1f1dSLionel Sambuc
1130*0a6a1f1dSLionel Sambuc return total_memusage + outq_memusage;
1131*0a6a1f1dSLionel Sambuc }
1132