xref: /freebsd-src/contrib/xz/src/liblzma/common/stream_encoder_mt.c (revision 1456f0f9681bbd7fdae7b683553f6c7491508c4e)
153200025SRui Paulo ///////////////////////////////////////////////////////////////////////////////
253200025SRui Paulo //
353200025SRui Paulo /// \file       stream_encoder_mt.c
453200025SRui Paulo /// \brief      Multithreaded .xz Stream encoder
553200025SRui Paulo //
653200025SRui Paulo //  Author:     Lasse Collin
753200025SRui Paulo //
853200025SRui Paulo //  This file has been put into the public domain.
953200025SRui Paulo //  You can do whatever you want with this file.
1053200025SRui Paulo //
1153200025SRui Paulo ///////////////////////////////////////////////////////////////////////////////
1253200025SRui Paulo 
1353200025SRui Paulo #include "filter_encoder.h"
1453200025SRui Paulo #include "easy_preset.h"
1553200025SRui Paulo #include "block_encoder.h"
1653200025SRui Paulo #include "block_buffer_encoder.h"
1753200025SRui Paulo #include "index_encoder.h"
1853200025SRui Paulo #include "outqueue.h"
1953200025SRui Paulo 
2053200025SRui Paulo 
2153200025SRui Paulo /// Maximum supported block size. This makes it simpler to prevent integer
2253200025SRui Paulo /// overflows if we are given unusually large block size.
2353200025SRui Paulo #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
2453200025SRui Paulo 
2553200025SRui Paulo 
2653200025SRui Paulo typedef enum {
2753200025SRui Paulo 	/// Waiting for work.
2853200025SRui Paulo 	THR_IDLE,
2953200025SRui Paulo 
3053200025SRui Paulo 	/// Encoding is in progress.
3153200025SRui Paulo 	THR_RUN,
3253200025SRui Paulo 
3353200025SRui Paulo 	/// Encoding is in progress but no more input data will
3453200025SRui Paulo 	/// be read.
3553200025SRui Paulo 	THR_FINISH,
3653200025SRui Paulo 
3753200025SRui Paulo 	/// The main thread wants the thread to stop whatever it was doing
3853200025SRui Paulo 	/// but not exit.
3953200025SRui Paulo 	THR_STOP,
4053200025SRui Paulo 
4153200025SRui Paulo 	/// The main thread wants the thread to exit. We could use
4253200025SRui Paulo 	/// cancellation but since there's stopped anyway, this is lazier.
4353200025SRui Paulo 	THR_EXIT,
4453200025SRui Paulo 
4553200025SRui Paulo } worker_state;
4653200025SRui Paulo 
47*1456f0f9SXin LI typedef struct lzma_stream_coder_s lzma_stream_coder;
4853200025SRui Paulo 
4953200025SRui Paulo typedef struct worker_thread_s worker_thread;
5053200025SRui Paulo struct worker_thread_s {
5153200025SRui Paulo 	worker_state state;
5253200025SRui Paulo 
5353200025SRui Paulo 	/// Input buffer of coder->block_size bytes. The main thread will
5453200025SRui Paulo 	/// put new input into this and update in_size accordingly. Once
5553200025SRui Paulo 	/// no more input is coming, state will be set to THR_FINISH.
5653200025SRui Paulo 	uint8_t *in;
5753200025SRui Paulo 
5853200025SRui Paulo 	/// Amount of data available in the input buffer. This is modified
5953200025SRui Paulo 	/// only by the main thread.
6053200025SRui Paulo 	size_t in_size;
6153200025SRui Paulo 
6253200025SRui Paulo 	/// Output buffer for this thread. This is set by the main
6353200025SRui Paulo 	/// thread every time a new Block is started with this thread
6453200025SRui Paulo 	/// structure.
6553200025SRui Paulo 	lzma_outbuf *outbuf;
6653200025SRui Paulo 
6753200025SRui Paulo 	/// Pointer to the main structure is needed when putting this
6853200025SRui Paulo 	/// thread back to the stack of free threads.
69*1456f0f9SXin LI 	lzma_stream_coder *coder;
7053200025SRui Paulo 
7153200025SRui Paulo 	/// The allocator is set by the main thread. Since a copy of the
7253200025SRui Paulo 	/// pointer is kept here, the application must not change the
7353200025SRui Paulo 	/// allocator before calling lzma_end().
7453200025SRui Paulo 	const lzma_allocator *allocator;
7553200025SRui Paulo 
7653200025SRui Paulo 	/// Amount of uncompressed data that has already been compressed.
7753200025SRui Paulo 	uint64_t progress_in;
7853200025SRui Paulo 
7953200025SRui Paulo 	/// Amount of compressed data that is ready.
8053200025SRui Paulo 	uint64_t progress_out;
8153200025SRui Paulo 
8253200025SRui Paulo 	/// Block encoder
8353200025SRui Paulo 	lzma_next_coder block_encoder;
8453200025SRui Paulo 
8553200025SRui Paulo 	/// Compression options for this Block
8653200025SRui Paulo 	lzma_block block_options;
8753200025SRui Paulo 
8853200025SRui Paulo 	/// Next structure in the stack of free worker threads.
8953200025SRui Paulo 	worker_thread *next;
9053200025SRui Paulo 
9153200025SRui Paulo 	mythread_mutex mutex;
9253200025SRui Paulo 	mythread_cond cond;
9353200025SRui Paulo 
9453200025SRui Paulo 	/// The ID of this thread is used to join the thread
9553200025SRui Paulo 	/// when it's not needed anymore.
9653200025SRui Paulo 	mythread thread_id;
9753200025SRui Paulo };
9853200025SRui Paulo 
9953200025SRui Paulo 
100*1456f0f9SXin LI struct lzma_stream_coder_s {
10153200025SRui Paulo 	enum {
10253200025SRui Paulo 		SEQ_STREAM_HEADER,
10353200025SRui Paulo 		SEQ_BLOCK,
10453200025SRui Paulo 		SEQ_INDEX,
10553200025SRui Paulo 		SEQ_STREAM_FOOTER,
10653200025SRui Paulo 	} sequence;
10753200025SRui Paulo 
10853200025SRui Paulo 	/// Start a new Block every block_size bytes of input unless
10953200025SRui Paulo 	/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
11053200025SRui Paulo 	size_t block_size;
11153200025SRui Paulo 
11253200025SRui Paulo 	/// The filter chain currently in use
11353200025SRui Paulo 	lzma_filter filters[LZMA_FILTERS_MAX + 1];
11453200025SRui Paulo 
11553200025SRui Paulo 
11653200025SRui Paulo 	/// Index to hold sizes of the Blocks
11753200025SRui Paulo 	lzma_index *index;
11853200025SRui Paulo 
11953200025SRui Paulo 	/// Index encoder
12053200025SRui Paulo 	lzma_next_coder index_encoder;
12153200025SRui Paulo 
12253200025SRui Paulo 
12353200025SRui Paulo 	/// Stream Flags for encoding the Stream Header and Stream Footer.
12453200025SRui Paulo 	lzma_stream_flags stream_flags;
12553200025SRui Paulo 
12653200025SRui Paulo 	/// Buffer to hold Stream Header and Stream Footer.
12753200025SRui Paulo 	uint8_t header[LZMA_STREAM_HEADER_SIZE];
12853200025SRui Paulo 
12953200025SRui Paulo 	/// Read position in header[]
13053200025SRui Paulo 	size_t header_pos;
13153200025SRui Paulo 
13253200025SRui Paulo 
13353200025SRui Paulo 	/// Output buffer queue for compressed data
13453200025SRui Paulo 	lzma_outq outq;
13553200025SRui Paulo 
13653200025SRui Paulo 
13753200025SRui Paulo 	/// Maximum wait time if cannot use all the input and cannot
13853200025SRui Paulo 	/// fill the output buffer. This is in milliseconds.
13953200025SRui Paulo 	uint32_t timeout;
14053200025SRui Paulo 
14153200025SRui Paulo 
14253200025SRui Paulo 	/// Error code from a worker thread
14353200025SRui Paulo 	lzma_ret thread_error;
14453200025SRui Paulo 
14553200025SRui Paulo 	/// Array of allocated thread-specific structures
14653200025SRui Paulo 	worker_thread *threads;
14753200025SRui Paulo 
14853200025SRui Paulo 	/// Number of structures in "threads" above. This is also the
14953200025SRui Paulo 	/// number of threads that will be created at maximum.
15053200025SRui Paulo 	uint32_t threads_max;
15153200025SRui Paulo 
15253200025SRui Paulo 	/// Number of thread structures that have been initialized, and
15353200025SRui Paulo 	/// thus the number of worker threads actually created so far.
15453200025SRui Paulo 	uint32_t threads_initialized;
15553200025SRui Paulo 
15653200025SRui Paulo 	/// Stack of free threads. When a thread finishes, it puts itself
15753200025SRui Paulo 	/// back into this stack. This starts as empty because threads
15853200025SRui Paulo 	/// are created only when actually needed.
15953200025SRui Paulo 	worker_thread *threads_free;
16053200025SRui Paulo 
16153200025SRui Paulo 	/// The most recent worker thread to which the main thread writes
16253200025SRui Paulo 	/// the new input from the application.
16353200025SRui Paulo 	worker_thread *thr;
16453200025SRui Paulo 
16553200025SRui Paulo 
16653200025SRui Paulo 	/// Amount of uncompressed data in Blocks that have already
16753200025SRui Paulo 	/// been finished.
16853200025SRui Paulo 	uint64_t progress_in;
16953200025SRui Paulo 
17053200025SRui Paulo 	/// Amount of compressed data in Stream Header + Blocks that
17153200025SRui Paulo 	/// have already been finished.
17253200025SRui Paulo 	uint64_t progress_out;
17353200025SRui Paulo 
17453200025SRui Paulo 
17553200025SRui Paulo 	mythread_mutex mutex;
17653200025SRui Paulo 	mythread_cond cond;
17753200025SRui Paulo };
17853200025SRui Paulo 
17953200025SRui Paulo 
18053200025SRui Paulo /// Tell the main thread that something has gone wrong.
18153200025SRui Paulo static void
18253200025SRui Paulo worker_error(worker_thread *thr, lzma_ret ret)
18353200025SRui Paulo {
18453200025SRui Paulo 	assert(ret != LZMA_OK);
18553200025SRui Paulo 	assert(ret != LZMA_STREAM_END);
18653200025SRui Paulo 
18753200025SRui Paulo 	mythread_sync(thr->coder->mutex) {
18853200025SRui Paulo 		if (thr->coder->thread_error == LZMA_OK)
18953200025SRui Paulo 			thr->coder->thread_error = ret;
19053200025SRui Paulo 
19153200025SRui Paulo 		mythread_cond_signal(&thr->coder->cond);
19253200025SRui Paulo 	}
19353200025SRui Paulo 
19453200025SRui Paulo 	return;
19553200025SRui Paulo }
19653200025SRui Paulo 
19753200025SRui Paulo 
19853200025SRui Paulo static worker_state
19953200025SRui Paulo worker_encode(worker_thread *thr, worker_state state)
20053200025SRui Paulo {
20153200025SRui Paulo 	assert(thr->progress_in == 0);
20253200025SRui Paulo 	assert(thr->progress_out == 0);
20353200025SRui Paulo 
20453200025SRui Paulo 	// Set the Block options.
20553200025SRui Paulo 	thr->block_options = (lzma_block){
20653200025SRui Paulo 		.version = 0,
20753200025SRui Paulo 		.check = thr->coder->stream_flags.check,
20853200025SRui Paulo 		.compressed_size = thr->coder->outq.buf_size_max,
20953200025SRui Paulo 		.uncompressed_size = thr->coder->block_size,
21053200025SRui Paulo 
21153200025SRui Paulo 		// TODO: To allow changing the filter chain, the filters
21253200025SRui Paulo 		// array must be copied to each worker_thread.
21353200025SRui Paulo 		.filters = thr->coder->filters,
21453200025SRui Paulo 	};
21553200025SRui Paulo 
21653200025SRui Paulo 	// Calculate maximum size of the Block Header. This amount is
21753200025SRui Paulo 	// reserved in the beginning of the buffer so that Block Header
21853200025SRui Paulo 	// along with Compressed Size and Uncompressed Size can be
21953200025SRui Paulo 	// written there.
22053200025SRui Paulo 	lzma_ret ret = lzma_block_header_size(&thr->block_options);
22153200025SRui Paulo 	if (ret != LZMA_OK) {
22253200025SRui Paulo 		worker_error(thr, ret);
22353200025SRui Paulo 		return THR_STOP;
22453200025SRui Paulo 	}
22553200025SRui Paulo 
22653200025SRui Paulo 	// Initialize the Block encoder.
22753200025SRui Paulo 	ret = lzma_block_encoder_init(&thr->block_encoder,
22853200025SRui Paulo 			thr->allocator, &thr->block_options);
22953200025SRui Paulo 	if (ret != LZMA_OK) {
23053200025SRui Paulo 		worker_error(thr, ret);
23153200025SRui Paulo 		return THR_STOP;
23253200025SRui Paulo 	}
23353200025SRui Paulo 
23453200025SRui Paulo 	size_t in_pos = 0;
23553200025SRui Paulo 	size_t in_size = 0;
23653200025SRui Paulo 
23753200025SRui Paulo 	thr->outbuf->size = thr->block_options.header_size;
23853200025SRui Paulo 	const size_t out_size = thr->coder->outq.buf_size_max;
23953200025SRui Paulo 
24053200025SRui Paulo 	do {
24153200025SRui Paulo 		mythread_sync(thr->mutex) {
24253200025SRui Paulo 			// Store in_pos and out_pos into *thr so that
24353200025SRui Paulo 			// an application may read them via
24453200025SRui Paulo 			// lzma_get_progress() to get progress information.
24553200025SRui Paulo 			//
24653200025SRui Paulo 			// NOTE: These aren't updated when the encoding
24753200025SRui Paulo 			// finishes. Instead, the final values are taken
24853200025SRui Paulo 			// later from thr->outbuf.
24953200025SRui Paulo 			thr->progress_in = in_pos;
25053200025SRui Paulo 			thr->progress_out = thr->outbuf->size;
25153200025SRui Paulo 
25253200025SRui Paulo 			while (in_size == thr->in_size
25353200025SRui Paulo 					&& thr->state == THR_RUN)
25453200025SRui Paulo 				mythread_cond_wait(&thr->cond, &thr->mutex);
25553200025SRui Paulo 
25653200025SRui Paulo 			state = thr->state;
25753200025SRui Paulo 			in_size = thr->in_size;
25853200025SRui Paulo 		}
25953200025SRui Paulo 
26053200025SRui Paulo 		// Return if we were asked to stop or exit.
26153200025SRui Paulo 		if (state >= THR_STOP)
26253200025SRui Paulo 			return state;
26353200025SRui Paulo 
26453200025SRui Paulo 		lzma_action action = state == THR_FINISH
26553200025SRui Paulo 				? LZMA_FINISH : LZMA_RUN;
26653200025SRui Paulo 
26753200025SRui Paulo 		// Limit the amount of input given to the Block encoder
26853200025SRui Paulo 		// at once. This way this thread can react fairly quickly
26953200025SRui Paulo 		// if the main thread wants us to stop or exit.
27053200025SRui Paulo 		static const size_t in_chunk_max = 16384;
27153200025SRui Paulo 		size_t in_limit = in_size;
27253200025SRui Paulo 		if (in_size - in_pos > in_chunk_max) {
27353200025SRui Paulo 			in_limit = in_pos + in_chunk_max;
27453200025SRui Paulo 			action = LZMA_RUN;
27553200025SRui Paulo 		}
27653200025SRui Paulo 
27753200025SRui Paulo 		ret = thr->block_encoder.code(
27853200025SRui Paulo 				thr->block_encoder.coder, thr->allocator,
27953200025SRui Paulo 				thr->in, &in_pos, in_limit, thr->outbuf->buf,
28053200025SRui Paulo 				&thr->outbuf->size, out_size, action);
28153200025SRui Paulo 	} while (ret == LZMA_OK && thr->outbuf->size < out_size);
28253200025SRui Paulo 
28353200025SRui Paulo 	switch (ret) {
28453200025SRui Paulo 	case LZMA_STREAM_END:
28553200025SRui Paulo 		assert(state == THR_FINISH);
28653200025SRui Paulo 
28753200025SRui Paulo 		// Encode the Block Header. By doing it after
28853200025SRui Paulo 		// the compression, we can store the Compressed Size
28953200025SRui Paulo 		// and Uncompressed Size fields.
29053200025SRui Paulo 		ret = lzma_block_header_encode(&thr->block_options,
29153200025SRui Paulo 				thr->outbuf->buf);
29253200025SRui Paulo 		if (ret != LZMA_OK) {
29353200025SRui Paulo 			worker_error(thr, ret);
29453200025SRui Paulo 			return THR_STOP;
29553200025SRui Paulo 		}
29653200025SRui Paulo 
29753200025SRui Paulo 		break;
29853200025SRui Paulo 
29953200025SRui Paulo 	case LZMA_OK:
30053200025SRui Paulo 		// The data was incompressible. Encode it using uncompressed
30153200025SRui Paulo 		// LZMA2 chunks.
30253200025SRui Paulo 		//
30353200025SRui Paulo 		// First wait that we have gotten all the input.
30453200025SRui Paulo 		mythread_sync(thr->mutex) {
30553200025SRui Paulo 			while (thr->state == THR_RUN)
30653200025SRui Paulo 				mythread_cond_wait(&thr->cond, &thr->mutex);
30753200025SRui Paulo 
30853200025SRui Paulo 			state = thr->state;
30953200025SRui Paulo 			in_size = thr->in_size;
31053200025SRui Paulo 		}
31153200025SRui Paulo 
31253200025SRui Paulo 		if (state >= THR_STOP)
31353200025SRui Paulo 			return state;
31453200025SRui Paulo 
31553200025SRui Paulo 		// Do the encoding. This takes care of the Block Header too.
31653200025SRui Paulo 		thr->outbuf->size = 0;
31753200025SRui Paulo 		ret = lzma_block_uncomp_encode(&thr->block_options,
31853200025SRui Paulo 				thr->in, in_size, thr->outbuf->buf,
31953200025SRui Paulo 				&thr->outbuf->size, out_size);
32053200025SRui Paulo 
32153200025SRui Paulo 		// It shouldn't fail.
32253200025SRui Paulo 		if (ret != LZMA_OK) {
32353200025SRui Paulo 			worker_error(thr, LZMA_PROG_ERROR);
32453200025SRui Paulo 			return THR_STOP;
32553200025SRui Paulo 		}
32653200025SRui Paulo 
32753200025SRui Paulo 		break;
32853200025SRui Paulo 
32953200025SRui Paulo 	default:
33053200025SRui Paulo 		worker_error(thr, ret);
33153200025SRui Paulo 		return THR_STOP;
33253200025SRui Paulo 	}
33353200025SRui Paulo 
33453200025SRui Paulo 	// Set the size information that will be read by the main thread
33553200025SRui Paulo 	// to write the Index field.
33653200025SRui Paulo 	thr->outbuf->unpadded_size
33753200025SRui Paulo 			= lzma_block_unpadded_size(&thr->block_options);
33853200025SRui Paulo 	assert(thr->outbuf->unpadded_size != 0);
33953200025SRui Paulo 	thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
34053200025SRui Paulo 
34153200025SRui Paulo 	return THR_FINISH;
34253200025SRui Paulo }
34353200025SRui Paulo 
34453200025SRui Paulo 
34553200025SRui Paulo static MYTHREAD_RET_TYPE
34653200025SRui Paulo worker_start(void *thr_ptr)
34753200025SRui Paulo {
34853200025SRui Paulo 	worker_thread *thr = thr_ptr;
34953200025SRui Paulo 	worker_state state = THR_IDLE; // Init to silence a warning
35053200025SRui Paulo 
35153200025SRui Paulo 	while (true) {
35253200025SRui Paulo 		// Wait for work.
35353200025SRui Paulo 		mythread_sync(thr->mutex) {
35453200025SRui Paulo 			while (true) {
35553200025SRui Paulo 				// The thread is already idle so if we are
35653200025SRui Paulo 				// requested to stop, just set the state.
35753200025SRui Paulo 				if (thr->state == THR_STOP) {
35853200025SRui Paulo 					thr->state = THR_IDLE;
35953200025SRui Paulo 					mythread_cond_signal(&thr->cond);
36053200025SRui Paulo 				}
36153200025SRui Paulo 
36253200025SRui Paulo 				state = thr->state;
36353200025SRui Paulo 				if (state != THR_IDLE)
36453200025SRui Paulo 					break;
36553200025SRui Paulo 
36653200025SRui Paulo 				mythread_cond_wait(&thr->cond, &thr->mutex);
36753200025SRui Paulo 			}
36853200025SRui Paulo 		}
36953200025SRui Paulo 
37053200025SRui Paulo 		assert(state != THR_IDLE);
37153200025SRui Paulo 		assert(state != THR_STOP);
37253200025SRui Paulo 
37353200025SRui Paulo 		if (state <= THR_FINISH)
37453200025SRui Paulo 			state = worker_encode(thr, state);
37553200025SRui Paulo 
37653200025SRui Paulo 		if (state == THR_EXIT)
37753200025SRui Paulo 			break;
37853200025SRui Paulo 
37953200025SRui Paulo 		// Mark the thread as idle unless the main thread has
38053200025SRui Paulo 		// told us to exit. Signal is needed for the case
38153200025SRui Paulo 		// where the main thread is waiting for the threads to stop.
38253200025SRui Paulo 		mythread_sync(thr->mutex) {
38353200025SRui Paulo 			if (thr->state != THR_EXIT) {
38453200025SRui Paulo 				thr->state = THR_IDLE;
38553200025SRui Paulo 				mythread_cond_signal(&thr->cond);
38653200025SRui Paulo 			}
38753200025SRui Paulo 		}
38853200025SRui Paulo 
38953200025SRui Paulo 		mythread_sync(thr->coder->mutex) {
39053200025SRui Paulo 			// Mark the output buffer as finished if
39153200025SRui Paulo 			// no errors occurred.
39253200025SRui Paulo 			thr->outbuf->finished = state == THR_FINISH;
39353200025SRui Paulo 
39453200025SRui Paulo 			// Update the main progress info.
39553200025SRui Paulo 			thr->coder->progress_in
39653200025SRui Paulo 					+= thr->outbuf->uncompressed_size;
39753200025SRui Paulo 			thr->coder->progress_out += thr->outbuf->size;
39853200025SRui Paulo 			thr->progress_in = 0;
39953200025SRui Paulo 			thr->progress_out = 0;
40053200025SRui Paulo 
40153200025SRui Paulo 			// Return this thread to the stack of free threads.
40253200025SRui Paulo 			thr->next = thr->coder->threads_free;
40353200025SRui Paulo 			thr->coder->threads_free = thr;
40453200025SRui Paulo 
40553200025SRui Paulo 			mythread_cond_signal(&thr->coder->cond);
40653200025SRui Paulo 		}
40753200025SRui Paulo 	}
40853200025SRui Paulo 
40953200025SRui Paulo 	// Exiting, free the resources.
41053200025SRui Paulo 	mythread_mutex_destroy(&thr->mutex);
41153200025SRui Paulo 	mythread_cond_destroy(&thr->cond);
41253200025SRui Paulo 
41353200025SRui Paulo 	lzma_next_end(&thr->block_encoder, thr->allocator);
41453200025SRui Paulo 	lzma_free(thr->in, thr->allocator);
41553200025SRui Paulo 	return MYTHREAD_RET_VALUE;
41653200025SRui Paulo }
41753200025SRui Paulo 
41853200025SRui Paulo 
41953200025SRui Paulo /// Make the threads stop but not exit. Optionally wait for them to stop.
42053200025SRui Paulo static void
421*1456f0f9SXin LI threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
42253200025SRui Paulo {
42353200025SRui Paulo 	// Tell the threads to stop.
42453200025SRui Paulo 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
42553200025SRui Paulo 		mythread_sync(coder->threads[i].mutex) {
42653200025SRui Paulo 			coder->threads[i].state = THR_STOP;
42753200025SRui Paulo 			mythread_cond_signal(&coder->threads[i].cond);
42853200025SRui Paulo 		}
42953200025SRui Paulo 	}
43053200025SRui Paulo 
43153200025SRui Paulo 	if (!wait_for_threads)
43253200025SRui Paulo 		return;
43353200025SRui Paulo 
43453200025SRui Paulo 	// Wait for the threads to settle in the idle state.
43553200025SRui Paulo 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
43653200025SRui Paulo 		mythread_sync(coder->threads[i].mutex) {
43753200025SRui Paulo 			while (coder->threads[i].state != THR_IDLE)
43853200025SRui Paulo 				mythread_cond_wait(&coder->threads[i].cond,
43953200025SRui Paulo 						&coder->threads[i].mutex);
44053200025SRui Paulo 		}
44153200025SRui Paulo 	}
44253200025SRui Paulo 
44353200025SRui Paulo 	return;
44453200025SRui Paulo }
44553200025SRui Paulo 
44653200025SRui Paulo 
44753200025SRui Paulo /// Stop the threads and free the resources associated with them.
44853200025SRui Paulo /// Wait until the threads have exited.
44953200025SRui Paulo static void
450*1456f0f9SXin LI threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
45153200025SRui Paulo {
45253200025SRui Paulo 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
45353200025SRui Paulo 		mythread_sync(coder->threads[i].mutex) {
45453200025SRui Paulo 			coder->threads[i].state = THR_EXIT;
45553200025SRui Paulo 			mythread_cond_signal(&coder->threads[i].cond);
45653200025SRui Paulo 		}
45753200025SRui Paulo 	}
45853200025SRui Paulo 
45953200025SRui Paulo 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
46053200025SRui Paulo 		int ret = mythread_join(coder->threads[i].thread_id);
46153200025SRui Paulo 		assert(ret == 0);
46253200025SRui Paulo 		(void)ret;
46353200025SRui Paulo 	}
46453200025SRui Paulo 
46553200025SRui Paulo 	lzma_free(coder->threads, allocator);
46653200025SRui Paulo 	return;
46753200025SRui Paulo }
46853200025SRui Paulo 
46953200025SRui Paulo 
47053200025SRui Paulo /// Initialize a new worker_thread structure and create a new thread.
47153200025SRui Paulo static lzma_ret
472*1456f0f9SXin LI initialize_new_thread(lzma_stream_coder *coder,
473*1456f0f9SXin LI 		const lzma_allocator *allocator)
47453200025SRui Paulo {
47553200025SRui Paulo 	worker_thread *thr = &coder->threads[coder->threads_initialized];
47653200025SRui Paulo 
47753200025SRui Paulo 	thr->in = lzma_alloc(coder->block_size, allocator);
47853200025SRui Paulo 	if (thr->in == NULL)
47953200025SRui Paulo 		return LZMA_MEM_ERROR;
48053200025SRui Paulo 
48153200025SRui Paulo 	if (mythread_mutex_init(&thr->mutex))
48253200025SRui Paulo 		goto error_mutex;
48353200025SRui Paulo 
48453200025SRui Paulo 	if (mythread_cond_init(&thr->cond))
48553200025SRui Paulo 		goto error_cond;
48653200025SRui Paulo 
48753200025SRui Paulo 	thr->state = THR_IDLE;
48853200025SRui Paulo 	thr->allocator = allocator;
48953200025SRui Paulo 	thr->coder = coder;
49053200025SRui Paulo 	thr->progress_in = 0;
49153200025SRui Paulo 	thr->progress_out = 0;
49253200025SRui Paulo 	thr->block_encoder = LZMA_NEXT_CODER_INIT;
49353200025SRui Paulo 
49453200025SRui Paulo 	if (mythread_create(&thr->thread_id, &worker_start, thr))
49553200025SRui Paulo 		goto error_thread;
49653200025SRui Paulo 
49753200025SRui Paulo 	++coder->threads_initialized;
49853200025SRui Paulo 	coder->thr = thr;
49953200025SRui Paulo 
50053200025SRui Paulo 	return LZMA_OK;
50153200025SRui Paulo 
50253200025SRui Paulo error_thread:
50353200025SRui Paulo 	mythread_cond_destroy(&thr->cond);
50453200025SRui Paulo 
50553200025SRui Paulo error_cond:
50653200025SRui Paulo 	mythread_mutex_destroy(&thr->mutex);
50753200025SRui Paulo 
50853200025SRui Paulo error_mutex:
50953200025SRui Paulo 	lzma_free(thr->in, allocator);
51053200025SRui Paulo 	return LZMA_MEM_ERROR;
51153200025SRui Paulo }
51253200025SRui Paulo 
51353200025SRui Paulo 
51453200025SRui Paulo static lzma_ret
515*1456f0f9SXin LI get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
51653200025SRui Paulo {
51753200025SRui Paulo 	// If there are no free output subqueues, there is no
51853200025SRui Paulo 	// point to try getting a thread.
51953200025SRui Paulo 	if (!lzma_outq_has_buf(&coder->outq))
52053200025SRui Paulo 		return LZMA_OK;
52153200025SRui Paulo 
52253200025SRui Paulo 	// If there is a free structure on the stack, use it.
52353200025SRui Paulo 	mythread_sync(coder->mutex) {
52453200025SRui Paulo 		if (coder->threads_free != NULL) {
52553200025SRui Paulo 			coder->thr = coder->threads_free;
52653200025SRui Paulo 			coder->threads_free = coder->threads_free->next;
52753200025SRui Paulo 		}
52853200025SRui Paulo 	}
52953200025SRui Paulo 
53053200025SRui Paulo 	if (coder->thr == NULL) {
53153200025SRui Paulo 		// If there are no uninitialized structures left, return.
53253200025SRui Paulo 		if (coder->threads_initialized == coder->threads_max)
53353200025SRui Paulo 			return LZMA_OK;
53453200025SRui Paulo 
53553200025SRui Paulo 		// Initialize a new thread.
53653200025SRui Paulo 		return_if_error(initialize_new_thread(coder, allocator));
53753200025SRui Paulo 	}
53853200025SRui Paulo 
53953200025SRui Paulo 	// Reset the parts of the thread state that have to be done
54053200025SRui Paulo 	// in the main thread.
54153200025SRui Paulo 	mythread_sync(coder->thr->mutex) {
54253200025SRui Paulo 		coder->thr->state = THR_RUN;
54353200025SRui Paulo 		coder->thr->in_size = 0;
54453200025SRui Paulo 		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
54553200025SRui Paulo 		mythread_cond_signal(&coder->thr->cond);
54653200025SRui Paulo 	}
54753200025SRui Paulo 
54853200025SRui Paulo 	return LZMA_OK;
54953200025SRui Paulo }
55053200025SRui Paulo 
55153200025SRui Paulo 
55253200025SRui Paulo static lzma_ret
553*1456f0f9SXin LI stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
55453200025SRui Paulo 		const uint8_t *restrict in, size_t *restrict in_pos,
55553200025SRui Paulo 		size_t in_size, lzma_action action)
55653200025SRui Paulo {
55753200025SRui Paulo 	while (*in_pos < in_size
55853200025SRui Paulo 			|| (coder->thr != NULL && action != LZMA_RUN)) {
55953200025SRui Paulo 		if (coder->thr == NULL) {
56053200025SRui Paulo 			// Get a new thread.
56153200025SRui Paulo 			const lzma_ret ret = get_thread(coder, allocator);
56253200025SRui Paulo 			if (coder->thr == NULL)
56353200025SRui Paulo 				return ret;
56453200025SRui Paulo 		}
56553200025SRui Paulo 
56653200025SRui Paulo 		// Copy the input data to thread's buffer.
56753200025SRui Paulo 		size_t thr_in_size = coder->thr->in_size;
56853200025SRui Paulo 		lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
56953200025SRui Paulo 				&thr_in_size, coder->block_size);
57053200025SRui Paulo 
57153200025SRui Paulo 		// Tell the Block encoder to finish if
57253200025SRui Paulo 		//  - it has got block_size bytes of input; or
57353200025SRui Paulo 		//  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
57453200025SRui Paulo 		//    or LZMA_FULL_BARRIER was used.
57553200025SRui Paulo 		//
57653200025SRui Paulo 		// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
57753200025SRui Paulo 		const bool finish = thr_in_size == coder->block_size
57853200025SRui Paulo 				|| (*in_pos == in_size && action != LZMA_RUN);
57953200025SRui Paulo 
58053200025SRui Paulo 		bool block_error = false;
58153200025SRui Paulo 
58253200025SRui Paulo 		mythread_sync(coder->thr->mutex) {
58353200025SRui Paulo 			if (coder->thr->state == THR_IDLE) {
58453200025SRui Paulo 				// Something has gone wrong with the Block
58553200025SRui Paulo 				// encoder. It has set coder->thread_error
58653200025SRui Paulo 				// which we will read a few lines later.
58753200025SRui Paulo 				block_error = true;
58853200025SRui Paulo 			} else {
58953200025SRui Paulo 				// Tell the Block encoder its new amount
59053200025SRui Paulo 				// of input and update the state if needed.
59153200025SRui Paulo 				coder->thr->in_size = thr_in_size;
59253200025SRui Paulo 
59353200025SRui Paulo 				if (finish)
59453200025SRui Paulo 					coder->thr->state = THR_FINISH;
59553200025SRui Paulo 
59653200025SRui Paulo 				mythread_cond_signal(&coder->thr->cond);
59753200025SRui Paulo 			}
59853200025SRui Paulo 		}
59953200025SRui Paulo 
60053200025SRui Paulo 		if (block_error) {
60153200025SRui Paulo 			lzma_ret ret;
60253200025SRui Paulo 
60353200025SRui Paulo 			mythread_sync(coder->mutex) {
60453200025SRui Paulo 				ret = coder->thread_error;
60553200025SRui Paulo 			}
60653200025SRui Paulo 
60753200025SRui Paulo 			return ret;
60853200025SRui Paulo 		}
60953200025SRui Paulo 
61053200025SRui Paulo 		if (finish)
61153200025SRui Paulo 			coder->thr = NULL;
61253200025SRui Paulo 	}
61353200025SRui Paulo 
61453200025SRui Paulo 	return LZMA_OK;
61553200025SRui Paulo }
61653200025SRui Paulo 
61753200025SRui Paulo 
61853200025SRui Paulo /// Wait until more input can be consumed, more output can be read, or
61953200025SRui Paulo /// an optional timeout is reached.
62053200025SRui Paulo static bool
621*1456f0f9SXin LI wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
62253200025SRui Paulo 		bool *has_blocked, bool has_input)
62353200025SRui Paulo {
62453200025SRui Paulo 	if (coder->timeout != 0 && !*has_blocked) {
62553200025SRui Paulo 		// Every time when stream_encode_mt() is called via
62653200025SRui Paulo 		// lzma_code(), *has_blocked starts as false. We set it
62753200025SRui Paulo 		// to true here and calculate the absolute time when
62853200025SRui Paulo 		// we must return if there's nothing to do.
62953200025SRui Paulo 		//
63053200025SRui Paulo 		// The idea of *has_blocked is to avoid unneeded calls
63153200025SRui Paulo 		// to mythread_condtime_set(), which may do a syscall
63253200025SRui Paulo 		// depending on the operating system.
63353200025SRui Paulo 		*has_blocked = true;
63453200025SRui Paulo 		mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
63553200025SRui Paulo 	}
63653200025SRui Paulo 
63753200025SRui Paulo 	bool timed_out = false;
63853200025SRui Paulo 
63953200025SRui Paulo 	mythread_sync(coder->mutex) {
64053200025SRui Paulo 		// There are four things that we wait. If one of them
64153200025SRui Paulo 		// becomes possible, we return.
64253200025SRui Paulo 		//  - If there is input left, we need to get a free
64353200025SRui Paulo 		//    worker thread and an output buffer for it.
64453200025SRui Paulo 		//  - Data ready to be read from the output queue.
64553200025SRui Paulo 		//  - A worker thread indicates an error.
64653200025SRui Paulo 		//  - Time out occurs.
64753200025SRui Paulo 		while ((!has_input || coder->threads_free == NULL
64853200025SRui Paulo 					|| !lzma_outq_has_buf(&coder->outq))
64953200025SRui Paulo 				&& !lzma_outq_is_readable(&coder->outq)
65053200025SRui Paulo 				&& coder->thread_error == LZMA_OK
65153200025SRui Paulo 				&& !timed_out) {
65253200025SRui Paulo 			if (coder->timeout != 0)
65353200025SRui Paulo 				timed_out = mythread_cond_timedwait(
65453200025SRui Paulo 						&coder->cond, &coder->mutex,
65553200025SRui Paulo 						wait_abs) != 0;
65653200025SRui Paulo 			else
65753200025SRui Paulo 				mythread_cond_wait(&coder->cond,
65853200025SRui Paulo 						&coder->mutex);
65953200025SRui Paulo 		}
66053200025SRui Paulo 	}
66153200025SRui Paulo 
66253200025SRui Paulo 	return timed_out;
66353200025SRui Paulo }
66453200025SRui Paulo 
66553200025SRui Paulo 
66653200025SRui Paulo static lzma_ret
667*1456f0f9SXin LI stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
66853200025SRui Paulo 		const uint8_t *restrict in, size_t *restrict in_pos,
66953200025SRui Paulo 		size_t in_size, uint8_t *restrict out,
67053200025SRui Paulo 		size_t *restrict out_pos, size_t out_size, lzma_action action)
67153200025SRui Paulo {
672*1456f0f9SXin LI 	lzma_stream_coder *coder = coder_ptr;
673*1456f0f9SXin LI 
67453200025SRui Paulo 	switch (coder->sequence) {
67553200025SRui Paulo 	case SEQ_STREAM_HEADER:
67653200025SRui Paulo 		lzma_bufcpy(coder->header, &coder->header_pos,
67753200025SRui Paulo 				sizeof(coder->header),
67853200025SRui Paulo 				out, out_pos, out_size);
67953200025SRui Paulo 		if (coder->header_pos < sizeof(coder->header))
68053200025SRui Paulo 			return LZMA_OK;
68153200025SRui Paulo 
68253200025SRui Paulo 		coder->header_pos = 0;
68353200025SRui Paulo 		coder->sequence = SEQ_BLOCK;
68453200025SRui Paulo 
68553200025SRui Paulo 	// Fall through
68653200025SRui Paulo 
68753200025SRui Paulo 	case SEQ_BLOCK: {
68853200025SRui Paulo 		// Initialized to silence warnings.
68953200025SRui Paulo 		lzma_vli unpadded_size = 0;
69053200025SRui Paulo 		lzma_vli uncompressed_size = 0;
69153200025SRui Paulo 		lzma_ret ret = LZMA_OK;
69253200025SRui Paulo 
69353200025SRui Paulo 		// These are for wait_for_work().
69453200025SRui Paulo 		bool has_blocked = false;
69553200025SRui Paulo 		mythread_condtime wait_abs;
69653200025SRui Paulo 
69753200025SRui Paulo 		while (true) {
69853200025SRui Paulo 			mythread_sync(coder->mutex) {
69953200025SRui Paulo 				// Check for Block encoder errors.
70053200025SRui Paulo 				ret = coder->thread_error;
70153200025SRui Paulo 				if (ret != LZMA_OK) {
70253200025SRui Paulo 					assert(ret != LZMA_STREAM_END);
70353200025SRui Paulo 					break;
70453200025SRui Paulo 				}
70553200025SRui Paulo 
70653200025SRui Paulo 				// Try to read compressed data to out[].
70753200025SRui Paulo 				ret = lzma_outq_read(&coder->outq,
70853200025SRui Paulo 						out, out_pos, out_size,
70953200025SRui Paulo 						&unpadded_size,
71053200025SRui Paulo 						&uncompressed_size);
71153200025SRui Paulo 			}
71253200025SRui Paulo 
71353200025SRui Paulo 			if (ret == LZMA_STREAM_END) {
71453200025SRui Paulo 				// End of Block. Add it to the Index.
71553200025SRui Paulo 				ret = lzma_index_append(coder->index,
71653200025SRui Paulo 						allocator, unpadded_size,
71753200025SRui Paulo 						uncompressed_size);
71853200025SRui Paulo 
71953200025SRui Paulo 				// If we didn't fill the output buffer yet,
72053200025SRui Paulo 				// try to read more data. Maybe the next
72153200025SRui Paulo 				// outbuf has been finished already too.
72253200025SRui Paulo 				if (*out_pos < out_size)
72353200025SRui Paulo 					continue;
72453200025SRui Paulo 			}
72553200025SRui Paulo 
72653200025SRui Paulo 			if (ret != LZMA_OK) {
72753200025SRui Paulo 				// coder->thread_error was set or
72853200025SRui Paulo 				// lzma_index_append() failed.
72953200025SRui Paulo 				threads_stop(coder, false);
73053200025SRui Paulo 				return ret;
73153200025SRui Paulo 			}
73253200025SRui Paulo 
73353200025SRui Paulo 			// Try to give uncompressed data to a worker thread.
73453200025SRui Paulo 			ret = stream_encode_in(coder, allocator,
73553200025SRui Paulo 					in, in_pos, in_size, action);
73653200025SRui Paulo 			if (ret != LZMA_OK) {
73753200025SRui Paulo 				threads_stop(coder, false);
73853200025SRui Paulo 				return ret;
73953200025SRui Paulo 			}
74053200025SRui Paulo 
74153200025SRui Paulo 			// See if we should wait or return.
74253200025SRui Paulo 			//
74353200025SRui Paulo 			// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
74453200025SRui Paulo 			if (*in_pos == in_size) {
74553200025SRui Paulo 				// LZMA_RUN: More data is probably coming
74653200025SRui Paulo 				// so return to let the caller fill the
74753200025SRui Paulo 				// input buffer.
74853200025SRui Paulo 				if (action == LZMA_RUN)
74953200025SRui Paulo 					return LZMA_OK;
75053200025SRui Paulo 
75153200025SRui Paulo 				// LZMA_FULL_BARRIER: The same as with
75253200025SRui Paulo 				// LZMA_RUN but tell the caller that the
75353200025SRui Paulo 				// barrier was completed.
75453200025SRui Paulo 				if (action == LZMA_FULL_BARRIER)
75553200025SRui Paulo 					return LZMA_STREAM_END;
75653200025SRui Paulo 
75753200025SRui Paulo 				// Finishing or flushing isn't completed until
75853200025SRui Paulo 				// all input data has been encoded and copied
75953200025SRui Paulo 				// to the output buffer.
76053200025SRui Paulo 				if (lzma_outq_is_empty(&coder->outq)) {
76153200025SRui Paulo 					// LZMA_FINISH: Continue to encode
76253200025SRui Paulo 					// the Index field.
76353200025SRui Paulo 					if (action == LZMA_FINISH)
76453200025SRui Paulo 						break;
76553200025SRui Paulo 
76653200025SRui Paulo 					// LZMA_FULL_FLUSH: Return to tell
76753200025SRui Paulo 					// the caller that flushing was
76853200025SRui Paulo 					// completed.
76953200025SRui Paulo 					if (action == LZMA_FULL_FLUSH)
77053200025SRui Paulo 						return LZMA_STREAM_END;
77153200025SRui Paulo 				}
77253200025SRui Paulo 			}
77353200025SRui Paulo 
77453200025SRui Paulo 			// Return if there is no output space left.
77553200025SRui Paulo 			// This check must be done after testing the input
77653200025SRui Paulo 			// buffer, because we might want to use a different
77753200025SRui Paulo 			// return code.
77853200025SRui Paulo 			if (*out_pos == out_size)
77953200025SRui Paulo 				return LZMA_OK;
78053200025SRui Paulo 
78153200025SRui Paulo 			// Neither in nor out has been used completely.
78253200025SRui Paulo 			// Wait until there's something we can do.
78353200025SRui Paulo 			if (wait_for_work(coder, &wait_abs, &has_blocked,
78453200025SRui Paulo 					*in_pos < in_size))
78553200025SRui Paulo 				return LZMA_TIMED_OUT;
78653200025SRui Paulo 		}
78753200025SRui Paulo 
78853200025SRui Paulo 		// All Blocks have been encoded and the threads have stopped.
78953200025SRui Paulo 		// Prepare to encode the Index field.
79053200025SRui Paulo 		return_if_error(lzma_index_encoder_init(
79153200025SRui Paulo 				&coder->index_encoder, allocator,
79253200025SRui Paulo 				coder->index));
79353200025SRui Paulo 		coder->sequence = SEQ_INDEX;
79453200025SRui Paulo 
79553200025SRui Paulo 		// Update the progress info to take the Index and
79653200025SRui Paulo 		// Stream Footer into account. Those are very fast to encode
79753200025SRui Paulo 		// so in terms of progress information they can be thought
79853200025SRui Paulo 		// to be ready to be copied out.
79953200025SRui Paulo 		coder->progress_out += lzma_index_size(coder->index)
80053200025SRui Paulo 				+ LZMA_STREAM_HEADER_SIZE;
80153200025SRui Paulo 	}
80253200025SRui Paulo 
80353200025SRui Paulo 	// Fall through
80453200025SRui Paulo 
80553200025SRui Paulo 	case SEQ_INDEX: {
80653200025SRui Paulo 		// Call the Index encoder. It doesn't take any input, so
80753200025SRui Paulo 		// those pointers can be NULL.
80853200025SRui Paulo 		const lzma_ret ret = coder->index_encoder.code(
80953200025SRui Paulo 				coder->index_encoder.coder, allocator,
81053200025SRui Paulo 				NULL, NULL, 0,
81153200025SRui Paulo 				out, out_pos, out_size, LZMA_RUN);
81253200025SRui Paulo 		if (ret != LZMA_STREAM_END)
81353200025SRui Paulo 			return ret;
81453200025SRui Paulo 
81553200025SRui Paulo 		// Encode the Stream Footer into coder->buffer.
81653200025SRui Paulo 		coder->stream_flags.backward_size
81753200025SRui Paulo 				= lzma_index_size(coder->index);
81853200025SRui Paulo 		if (lzma_stream_footer_encode(&coder->stream_flags,
81953200025SRui Paulo 				coder->header) != LZMA_OK)
82053200025SRui Paulo 			return LZMA_PROG_ERROR;
82153200025SRui Paulo 
82253200025SRui Paulo 		coder->sequence = SEQ_STREAM_FOOTER;
82353200025SRui Paulo 	}
82453200025SRui Paulo 
82553200025SRui Paulo 	// Fall through
82653200025SRui Paulo 
82753200025SRui Paulo 	case SEQ_STREAM_FOOTER:
82853200025SRui Paulo 		lzma_bufcpy(coder->header, &coder->header_pos,
82953200025SRui Paulo 				sizeof(coder->header),
83053200025SRui Paulo 				out, out_pos, out_size);
83153200025SRui Paulo 		return coder->header_pos < sizeof(coder->header)
83253200025SRui Paulo 				? LZMA_OK : LZMA_STREAM_END;
83353200025SRui Paulo 	}
83453200025SRui Paulo 
83553200025SRui Paulo 	assert(0);
83653200025SRui Paulo 	return LZMA_PROG_ERROR;
83753200025SRui Paulo }
83853200025SRui Paulo 
83953200025SRui Paulo 
84053200025SRui Paulo static void
841*1456f0f9SXin LI stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
84253200025SRui Paulo {
843*1456f0f9SXin LI 	lzma_stream_coder *coder = coder_ptr;
844*1456f0f9SXin LI 
84553200025SRui Paulo 	// Threads must be killed before the output queue can be freed.
84653200025SRui Paulo 	threads_end(coder, allocator);
84753200025SRui Paulo 	lzma_outq_end(&coder->outq, allocator);
84853200025SRui Paulo 
84953200025SRui Paulo 	for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
85053200025SRui Paulo 		lzma_free(coder->filters[i].options, allocator);
85153200025SRui Paulo 
85253200025SRui Paulo 	lzma_next_end(&coder->index_encoder, allocator);
85353200025SRui Paulo 	lzma_index_end(coder->index, allocator);
85453200025SRui Paulo 
85553200025SRui Paulo 	mythread_cond_destroy(&coder->cond);
85653200025SRui Paulo 	mythread_mutex_destroy(&coder->mutex);
85753200025SRui Paulo 
85853200025SRui Paulo 	lzma_free(coder, allocator);
85953200025SRui Paulo 	return;
86053200025SRui Paulo }
86153200025SRui Paulo 
86253200025SRui Paulo 
86353200025SRui Paulo /// Options handling for lzma_stream_encoder_mt_init() and
86453200025SRui Paulo /// lzma_stream_encoder_mt_memusage()
86553200025SRui Paulo static lzma_ret
86653200025SRui Paulo get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
86753200025SRui Paulo 		const lzma_filter **filters, uint64_t *block_size,
86853200025SRui Paulo 		uint64_t *outbuf_size_max)
86953200025SRui Paulo {
87053200025SRui Paulo 	// Validate some of the options.
87153200025SRui Paulo 	if (options == NULL)
87253200025SRui Paulo 		return LZMA_PROG_ERROR;
87353200025SRui Paulo 
87453200025SRui Paulo 	if (options->flags != 0 || options->threads == 0
87553200025SRui Paulo 			|| options->threads > LZMA_THREADS_MAX)
87653200025SRui Paulo 		return LZMA_OPTIONS_ERROR;
87753200025SRui Paulo 
87853200025SRui Paulo 	if (options->filters != NULL) {
87953200025SRui Paulo 		// Filter chain was given, use it as is.
88053200025SRui Paulo 		*filters = options->filters;
88153200025SRui Paulo 	} else {
88253200025SRui Paulo 		// Use a preset.
88353200025SRui Paulo 		if (lzma_easy_preset(opt_easy, options->preset))
88453200025SRui Paulo 			return LZMA_OPTIONS_ERROR;
88553200025SRui Paulo 
88653200025SRui Paulo 		*filters = opt_easy->filters;
88753200025SRui Paulo 	}
88853200025SRui Paulo 
88953200025SRui Paulo 	// Block size
89053200025SRui Paulo 	if (options->block_size > 0) {
89153200025SRui Paulo 		if (options->block_size > BLOCK_SIZE_MAX)
89253200025SRui Paulo 			return LZMA_OPTIONS_ERROR;
89353200025SRui Paulo 
89453200025SRui Paulo 		*block_size = options->block_size;
89553200025SRui Paulo 	} else {
89653200025SRui Paulo 		// Determine the Block size from the filter chain.
89753200025SRui Paulo 		*block_size = lzma_mt_block_size(*filters);
89853200025SRui Paulo 		if (*block_size == 0)
89953200025SRui Paulo 			return LZMA_OPTIONS_ERROR;
90053200025SRui Paulo 
90153200025SRui Paulo 		assert(*block_size <= BLOCK_SIZE_MAX);
90253200025SRui Paulo 	}
90353200025SRui Paulo 
90453200025SRui Paulo 	// Calculate the maximum amount output that a single output buffer
90553200025SRui Paulo 	// may need to hold. This is the same as the maximum total size of
90653200025SRui Paulo 	// a Block.
90753200025SRui Paulo 	*outbuf_size_max = lzma_block_buffer_bound64(*block_size);
90853200025SRui Paulo 	if (*outbuf_size_max == 0)
90953200025SRui Paulo 		return LZMA_MEM_ERROR;
91053200025SRui Paulo 
91153200025SRui Paulo 	return LZMA_OK;
91253200025SRui Paulo }
91353200025SRui Paulo 
91453200025SRui Paulo 
91553200025SRui Paulo static void
916*1456f0f9SXin LI get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
91753200025SRui Paulo {
918*1456f0f9SXin LI 	lzma_stream_coder *coder = coder_ptr;
919*1456f0f9SXin LI 
92053200025SRui Paulo 	// Lock coder->mutex to prevent finishing threads from moving their
921*1456f0f9SXin LI 	// progress info from the worker_thread structure to lzma_stream_coder.
92253200025SRui Paulo 	mythread_sync(coder->mutex) {
92353200025SRui Paulo 		*progress_in = coder->progress_in;
92453200025SRui Paulo 		*progress_out = coder->progress_out;
92553200025SRui Paulo 
92653200025SRui Paulo 		for (size_t i = 0; i < coder->threads_initialized; ++i) {
92753200025SRui Paulo 			mythread_sync(coder->threads[i].mutex) {
92853200025SRui Paulo 				*progress_in += coder->threads[i].progress_in;
92953200025SRui Paulo 				*progress_out += coder->threads[i]
93053200025SRui Paulo 						.progress_out;
93153200025SRui Paulo 			}
93253200025SRui Paulo 		}
93353200025SRui Paulo 	}
93453200025SRui Paulo 
93553200025SRui Paulo 	return;
93653200025SRui Paulo }
93753200025SRui Paulo 
93853200025SRui Paulo 
93953200025SRui Paulo static lzma_ret
94053200025SRui Paulo stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
94153200025SRui Paulo 		const lzma_mt *options)
94253200025SRui Paulo {
94353200025SRui Paulo 	lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
94453200025SRui Paulo 
94553200025SRui Paulo 	// Get the filter chain.
94653200025SRui Paulo 	lzma_options_easy easy;
94753200025SRui Paulo 	const lzma_filter *filters;
94853200025SRui Paulo 	uint64_t block_size;
94953200025SRui Paulo 	uint64_t outbuf_size_max;
95053200025SRui Paulo 	return_if_error(get_options(options, &easy, &filters,
95153200025SRui Paulo 			&block_size, &outbuf_size_max));
95253200025SRui Paulo 
95353200025SRui Paulo #if SIZE_MAX < UINT64_MAX
95453200025SRui Paulo 	if (block_size > SIZE_MAX)
95553200025SRui Paulo 		return LZMA_MEM_ERROR;
95653200025SRui Paulo #endif
95753200025SRui Paulo 
95853200025SRui Paulo 	// Validate the filter chain so that we can give an error in this
95953200025SRui Paulo 	// function instead of delaying it to the first call to lzma_code().
96053200025SRui Paulo 	// The memory usage calculation verifies the filter chain as
96153200025SRui Paulo 	// a side effect so we take advatange of that.
96253200025SRui Paulo 	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
96353200025SRui Paulo 		return LZMA_OPTIONS_ERROR;
96453200025SRui Paulo 
96553200025SRui Paulo 	// Validate the Check ID.
96653200025SRui Paulo 	if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
96753200025SRui Paulo 		return LZMA_PROG_ERROR;
96853200025SRui Paulo 
96953200025SRui Paulo 	if (!lzma_check_is_supported(options->check))
97053200025SRui Paulo 		return LZMA_UNSUPPORTED_CHECK;
97153200025SRui Paulo 
97253200025SRui Paulo 	// Allocate and initialize the base structure if needed.
973*1456f0f9SXin LI 	lzma_stream_coder *coder = next->coder;
974*1456f0f9SXin LI 	if (coder == NULL) {
975*1456f0f9SXin LI 		coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
976*1456f0f9SXin LI 		if (coder == NULL)
97753200025SRui Paulo 			return LZMA_MEM_ERROR;
97853200025SRui Paulo 
979*1456f0f9SXin LI 		next->coder = coder;
980*1456f0f9SXin LI 
98153200025SRui Paulo 		// For the mutex and condition variable initializations
98253200025SRui Paulo 		// the error handling has to be done here because
98353200025SRui Paulo 		// stream_encoder_mt_end() doesn't know if they have
98453200025SRui Paulo 		// already been initialized or not.
985*1456f0f9SXin LI 		if (mythread_mutex_init(&coder->mutex)) {
986*1456f0f9SXin LI 			lzma_free(coder, allocator);
98753200025SRui Paulo 			next->coder = NULL;
98853200025SRui Paulo 			return LZMA_MEM_ERROR;
98953200025SRui Paulo 		}
99053200025SRui Paulo 
991*1456f0f9SXin LI 		if (mythread_cond_init(&coder->cond)) {
992*1456f0f9SXin LI 			mythread_mutex_destroy(&coder->mutex);
993*1456f0f9SXin LI 			lzma_free(coder, allocator);
99453200025SRui Paulo 			next->coder = NULL;
99553200025SRui Paulo 			return LZMA_MEM_ERROR;
99653200025SRui Paulo 		}
99753200025SRui Paulo 
99853200025SRui Paulo 		next->code = &stream_encode_mt;
99953200025SRui Paulo 		next->end = &stream_encoder_mt_end;
100053200025SRui Paulo 		next->get_progress = &get_progress;
100153200025SRui Paulo // 		next->update = &stream_encoder_mt_update;
100253200025SRui Paulo 
1003*1456f0f9SXin LI 		coder->filters[0].id = LZMA_VLI_UNKNOWN;
1004*1456f0f9SXin LI 		coder->index_encoder = LZMA_NEXT_CODER_INIT;
1005*1456f0f9SXin LI 		coder->index = NULL;
1006*1456f0f9SXin LI 		memzero(&coder->outq, sizeof(coder->outq));
1007*1456f0f9SXin LI 		coder->threads = NULL;
1008*1456f0f9SXin LI 		coder->threads_max = 0;
1009*1456f0f9SXin LI 		coder->threads_initialized = 0;
101053200025SRui Paulo 	}
101153200025SRui Paulo 
101253200025SRui Paulo 	// Basic initializations
1013*1456f0f9SXin LI 	coder->sequence = SEQ_STREAM_HEADER;
1014*1456f0f9SXin LI 	coder->block_size = (size_t)(block_size);
1015*1456f0f9SXin LI 	coder->thread_error = LZMA_OK;
1016*1456f0f9SXin LI 	coder->thr = NULL;
101753200025SRui Paulo 
101853200025SRui Paulo 	// Allocate the thread-specific base structures.
101953200025SRui Paulo 	assert(options->threads > 0);
1020*1456f0f9SXin LI 	if (coder->threads_max != options->threads) {
1021*1456f0f9SXin LI 		threads_end(coder, allocator);
102253200025SRui Paulo 
1023*1456f0f9SXin LI 		coder->threads = NULL;
1024*1456f0f9SXin LI 		coder->threads_max = 0;
102553200025SRui Paulo 
1026*1456f0f9SXin LI 		coder->threads_initialized = 0;
1027*1456f0f9SXin LI 		coder->threads_free = NULL;
102853200025SRui Paulo 
1029*1456f0f9SXin LI 		coder->threads = lzma_alloc(
103053200025SRui Paulo 				options->threads * sizeof(worker_thread),
103153200025SRui Paulo 				allocator);
1032*1456f0f9SXin LI 		if (coder->threads == NULL)
103353200025SRui Paulo 			return LZMA_MEM_ERROR;
103453200025SRui Paulo 
1035*1456f0f9SXin LI 		coder->threads_max = options->threads;
103653200025SRui Paulo 	} else {
103753200025SRui Paulo 		// Reuse the old structures and threads. Tell the running
103853200025SRui Paulo 		// threads to stop and wait until they have stopped.
1039*1456f0f9SXin LI 		threads_stop(coder, true);
104053200025SRui Paulo 	}
104153200025SRui Paulo 
104253200025SRui Paulo 	// Output queue
1043*1456f0f9SXin LI 	return_if_error(lzma_outq_init(&coder->outq, allocator,
104453200025SRui Paulo 			outbuf_size_max, options->threads));
104553200025SRui Paulo 
104653200025SRui Paulo 	// Timeout
1047*1456f0f9SXin LI 	coder->timeout = options->timeout;
104853200025SRui Paulo 
104953200025SRui Paulo 	// Free the old filter chain and copy the new one.
1050*1456f0f9SXin LI 	for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1051*1456f0f9SXin LI 		lzma_free(coder->filters[i].options, allocator);
105253200025SRui Paulo 
105353200025SRui Paulo 	return_if_error(lzma_filters_copy(
1054*1456f0f9SXin LI 			filters, coder->filters, allocator));
105553200025SRui Paulo 
105653200025SRui Paulo 	// Index
1057*1456f0f9SXin LI 	lzma_index_end(coder->index, allocator);
1058*1456f0f9SXin LI 	coder->index = lzma_index_init(allocator);
1059*1456f0f9SXin LI 	if (coder->index == NULL)
106053200025SRui Paulo 		return LZMA_MEM_ERROR;
106153200025SRui Paulo 
106253200025SRui Paulo 	// Stream Header
1063*1456f0f9SXin LI 	coder->stream_flags.version = 0;
1064*1456f0f9SXin LI 	coder->stream_flags.check = options->check;
106553200025SRui Paulo 	return_if_error(lzma_stream_header_encode(
1066*1456f0f9SXin LI 			&coder->stream_flags, coder->header));
106753200025SRui Paulo 
1068*1456f0f9SXin LI 	coder->header_pos = 0;
106953200025SRui Paulo 
107053200025SRui Paulo 	// Progress info
1071*1456f0f9SXin LI 	coder->progress_in = 0;
1072*1456f0f9SXin LI 	coder->progress_out = LZMA_STREAM_HEADER_SIZE;
107353200025SRui Paulo 
107453200025SRui Paulo 	return LZMA_OK;
107553200025SRui Paulo }
107653200025SRui Paulo 
107753200025SRui Paulo 
107853200025SRui Paulo extern LZMA_API(lzma_ret)
107953200025SRui Paulo lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
108053200025SRui Paulo {
108153200025SRui Paulo 	lzma_next_strm_init(stream_encoder_mt_init, strm, options);
108253200025SRui Paulo 
108353200025SRui Paulo 	strm->internal->supported_actions[LZMA_RUN] = true;
108453200025SRui Paulo // 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
108553200025SRui Paulo 	strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
108653200025SRui Paulo 	strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
108753200025SRui Paulo 	strm->internal->supported_actions[LZMA_FINISH] = true;
108853200025SRui Paulo 
108953200025SRui Paulo 	return LZMA_OK;
109053200025SRui Paulo }
109153200025SRui Paulo 
109253200025SRui Paulo 
109353200025SRui Paulo // This function name is a monster but it's consistent with the older
109453200025SRui Paulo // monster names. :-( 31 chars is the max that C99 requires so in that
109553200025SRui Paulo // sense it's not too long. ;-)
109653200025SRui Paulo extern LZMA_API(uint64_t)
109753200025SRui Paulo lzma_stream_encoder_mt_memusage(const lzma_mt *options)
109853200025SRui Paulo {
109953200025SRui Paulo 	lzma_options_easy easy;
110053200025SRui Paulo 	const lzma_filter *filters;
110153200025SRui Paulo 	uint64_t block_size;
110253200025SRui Paulo 	uint64_t outbuf_size_max;
110353200025SRui Paulo 
110453200025SRui Paulo 	if (get_options(options, &easy, &filters, &block_size,
110553200025SRui Paulo 			&outbuf_size_max) != LZMA_OK)
110653200025SRui Paulo 		return UINT64_MAX;
110753200025SRui Paulo 
110853200025SRui Paulo 	// Memory usage of the input buffers
110953200025SRui Paulo 	const uint64_t inbuf_memusage = options->threads * block_size;
111053200025SRui Paulo 
111153200025SRui Paulo 	// Memory usage of the filter encoders
111253200025SRui Paulo 	uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
111353200025SRui Paulo 	if (filters_memusage == UINT64_MAX)
111453200025SRui Paulo 		return UINT64_MAX;
111553200025SRui Paulo 
111653200025SRui Paulo 	filters_memusage *= options->threads;
111753200025SRui Paulo 
111853200025SRui Paulo 	// Memory usage of the output queue
111953200025SRui Paulo 	const uint64_t outq_memusage = lzma_outq_memusage(
112053200025SRui Paulo 			outbuf_size_max, options->threads);
112153200025SRui Paulo 	if (outq_memusage == UINT64_MAX)
112253200025SRui Paulo 		return UINT64_MAX;
112353200025SRui Paulo 
112453200025SRui Paulo 	// Sum them with overflow checking.
1125*1456f0f9SXin LI 	uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1126*1456f0f9SXin LI 			+ sizeof(lzma_stream_coder)
112753200025SRui Paulo 			+ options->threads * sizeof(worker_thread);
112853200025SRui Paulo 
112953200025SRui Paulo 	if (UINT64_MAX - total_memusage < inbuf_memusage)
113053200025SRui Paulo 		return UINT64_MAX;
113153200025SRui Paulo 
113253200025SRui Paulo 	total_memusage += inbuf_memusage;
113353200025SRui Paulo 
113453200025SRui Paulo 	if (UINT64_MAX - total_memusage < filters_memusage)
113553200025SRui Paulo 		return UINT64_MAX;
113653200025SRui Paulo 
113753200025SRui Paulo 	total_memusage += filters_memusage;
113853200025SRui Paulo 
113953200025SRui Paulo 	if (UINT64_MAX - total_memusage < outq_memusage)
114053200025SRui Paulo 		return UINT64_MAX;
114153200025SRui Paulo 
114253200025SRui Paulo 	return total_memusage + outq_memusage;
114353200025SRui Paulo }
1144