xref: /freebsd-src/sys/contrib/zstd/examples/streaming_compression_thread_pool.c (revision f7cd7fe51c4140960ebea00410ed62894f5625d1)
1*f7cd7fe5SConrad Meyer /*
2*f7cd7fe5SConrad Meyer  * Copyright (c) 2020, Martin Liska, SUSE, Facebook, Inc.
3*f7cd7fe5SConrad Meyer  * All rights reserved.
4*f7cd7fe5SConrad Meyer  *
5*f7cd7fe5SConrad Meyer  * This source code is licensed under both the BSD-style license (found in the
6*f7cd7fe5SConrad Meyer  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7*f7cd7fe5SConrad Meyer  * in the COPYING file in the root directory of this source tree).
8*f7cd7fe5SConrad Meyer  * You may select, at your option, one of the above-listed licenses.
9*f7cd7fe5SConrad Meyer  */
10*f7cd7fe5SConrad Meyer 
11*f7cd7fe5SConrad Meyer 
12*f7cd7fe5SConrad Meyer #include <stdio.h>     // printf
13*f7cd7fe5SConrad Meyer #include <stdlib.h>    // free
14*f7cd7fe5SConrad Meyer #include <string.h>    // memset, strcat, strlen
15*f7cd7fe5SConrad Meyer #include <zstd.h>      // presumes zstd library is installed
16*f7cd7fe5SConrad Meyer #include "common.h"    // Helper functions, CHECK(), and CHECK_ZSTD()
17*f7cd7fe5SConrad Meyer #include <pthread.h>
18*f7cd7fe5SConrad Meyer 
19*f7cd7fe5SConrad Meyer typedef struct compress_args
20*f7cd7fe5SConrad Meyer {
21*f7cd7fe5SConrad Meyer   const char *fname;
22*f7cd7fe5SConrad Meyer   char *outName;
23*f7cd7fe5SConrad Meyer   int cLevel;
24*f7cd7fe5SConrad Meyer #if defined(ZSTD_STATIC_LINKING_ONLY)
25*f7cd7fe5SConrad Meyer   ZSTD_threadPool *pool;
26*f7cd7fe5SConrad Meyer #endif
27*f7cd7fe5SConrad Meyer } compress_args_t;
28*f7cd7fe5SConrad Meyer 
29*f7cd7fe5SConrad Meyer static void *compressFile_orDie(void *data)
30*f7cd7fe5SConrad Meyer {
31*f7cd7fe5SConrad Meyer     compress_args_t *args = (compress_args_t *)data;
32*f7cd7fe5SConrad Meyer     fprintf (stderr, "Starting compression of %s with level %d\n", args->fname, args->cLevel);
33*f7cd7fe5SConrad Meyer     /* Open the input and output files. */
34*f7cd7fe5SConrad Meyer     FILE* const fin  = fopen_orDie(args->fname, "rb");
35*f7cd7fe5SConrad Meyer     FILE* const fout = fopen_orDie(args->outName, "wb");
36*f7cd7fe5SConrad Meyer     /* Create the input and output buffers.
37*f7cd7fe5SConrad Meyer      * They may be any size, but we recommend using these functions to size them.
38*f7cd7fe5SConrad Meyer      * Performance will only suffer significantly for very tiny buffers.
39*f7cd7fe5SConrad Meyer      */
40*f7cd7fe5SConrad Meyer     size_t const buffInSize = ZSTD_CStreamInSize();
41*f7cd7fe5SConrad Meyer     void*  const buffIn  = malloc_orDie(buffInSize);
42*f7cd7fe5SConrad Meyer     size_t const buffOutSize = ZSTD_CStreamOutSize();
43*f7cd7fe5SConrad Meyer     void*  const buffOut = malloc_orDie(buffOutSize);
44*f7cd7fe5SConrad Meyer 
45*f7cd7fe5SConrad Meyer     /* Create the context. */
46*f7cd7fe5SConrad Meyer     ZSTD_CCtx* const cctx = ZSTD_createCCtx();
47*f7cd7fe5SConrad Meyer     CHECK(cctx != NULL, "ZSTD_createCCtx() failed!");
48*f7cd7fe5SConrad Meyer 
49*f7cd7fe5SConrad Meyer #if defined(ZSTD_STATIC_LINKING_ONLY)
50*f7cd7fe5SConrad Meyer     size_t r = ZSTD_CCtx_refThreadPool(cctx, args->pool);
51*f7cd7fe5SConrad Meyer     CHECK(r == 0, "ZSTD_CCtx_refThreadPool failed!");
52*f7cd7fe5SConrad Meyer #endif
53*f7cd7fe5SConrad Meyer 
54*f7cd7fe5SConrad Meyer     /* Set any parameters you want.
55*f7cd7fe5SConrad Meyer      * Here we set the compression level, and enable the checksum.
56*f7cd7fe5SConrad Meyer      */
57*f7cd7fe5SConrad Meyer     CHECK_ZSTD( ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, args->cLevel) );
58*f7cd7fe5SConrad Meyer     CHECK_ZSTD( ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1) );
59*f7cd7fe5SConrad Meyer     ZSTD_CCtx_setParameter(cctx, ZSTD_c_nbWorkers, 16);
60*f7cd7fe5SConrad Meyer 
61*f7cd7fe5SConrad Meyer     /* This loop read from the input file, compresses that entire chunk,
62*f7cd7fe5SConrad Meyer      * and writes all output produced to the output file.
63*f7cd7fe5SConrad Meyer      */
64*f7cd7fe5SConrad Meyer     size_t const toRead = buffInSize;
65*f7cd7fe5SConrad Meyer     for (;;) {
66*f7cd7fe5SConrad Meyer         size_t read = fread_orDie(buffIn, toRead, fin);
67*f7cd7fe5SConrad Meyer         /* Select the flush mode.
68*f7cd7fe5SConrad Meyer          * If the read may not be finished (read == toRead) we use
69*f7cd7fe5SConrad Meyer          * ZSTD_e_continue. If this is the last chunk, we use ZSTD_e_end.
70*f7cd7fe5SConrad Meyer          * Zstd optimizes the case where the first flush mode is ZSTD_e_end,
71*f7cd7fe5SConrad Meyer          * since it knows it is compressing the entire source in one pass.
72*f7cd7fe5SConrad Meyer          */
73*f7cd7fe5SConrad Meyer         int const lastChunk = (read < toRead);
74*f7cd7fe5SConrad Meyer         ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue;
75*f7cd7fe5SConrad Meyer         /* Set the input buffer to what we just read.
76*f7cd7fe5SConrad Meyer          * We compress until the input buffer is empty, each time flushing the
77*f7cd7fe5SConrad Meyer          * output.
78*f7cd7fe5SConrad Meyer          */
79*f7cd7fe5SConrad Meyer         ZSTD_inBuffer input = { buffIn, read, 0 };
80*f7cd7fe5SConrad Meyer         int finished;
81*f7cd7fe5SConrad Meyer         do {
82*f7cd7fe5SConrad Meyer             /* Compress into the output buffer and write all of the output to
83*f7cd7fe5SConrad Meyer              * the file so we can reuse the buffer next iteration.
84*f7cd7fe5SConrad Meyer              */
85*f7cd7fe5SConrad Meyer             ZSTD_outBuffer output = { buffOut, buffOutSize, 0 };
86*f7cd7fe5SConrad Meyer             size_t const remaining = ZSTD_compressStream2(cctx, &output , &input, mode);
87*f7cd7fe5SConrad Meyer             CHECK_ZSTD(remaining);
88*f7cd7fe5SConrad Meyer             fwrite_orDie(buffOut, output.pos, fout);
89*f7cd7fe5SConrad Meyer             /* If we're on the last chunk we're finished when zstd returns 0,
90*f7cd7fe5SConrad Meyer              * which means its consumed all the input AND finished the frame.
91*f7cd7fe5SConrad Meyer              * Otherwise, we're finished when we've consumed all the input.
92*f7cd7fe5SConrad Meyer              */
93*f7cd7fe5SConrad Meyer             finished = lastChunk ? (remaining == 0) : (input.pos == input.size);
94*f7cd7fe5SConrad Meyer         } while (!finished);
95*f7cd7fe5SConrad Meyer         CHECK(input.pos == input.size,
96*f7cd7fe5SConrad Meyer               "Impossible: zstd only returns 0 when the input is completely consumed!");
97*f7cd7fe5SConrad Meyer 
98*f7cd7fe5SConrad Meyer         if (lastChunk) {
99*f7cd7fe5SConrad Meyer             break;
100*f7cd7fe5SConrad Meyer         }
101*f7cd7fe5SConrad Meyer     }
102*f7cd7fe5SConrad Meyer 
103*f7cd7fe5SConrad Meyer     fprintf (stderr, "Finishing compression of %s\n", args->outName);
104*f7cd7fe5SConrad Meyer 
105*f7cd7fe5SConrad Meyer     ZSTD_freeCCtx(cctx);
106*f7cd7fe5SConrad Meyer     fclose_orDie(fout);
107*f7cd7fe5SConrad Meyer     fclose_orDie(fin);
108*f7cd7fe5SConrad Meyer     free(buffIn);
109*f7cd7fe5SConrad Meyer     free(buffOut);
110*f7cd7fe5SConrad Meyer     free(args->outName);
111*f7cd7fe5SConrad Meyer 
112*f7cd7fe5SConrad Meyer     return NULL;
113*f7cd7fe5SConrad Meyer }
114*f7cd7fe5SConrad Meyer 
115*f7cd7fe5SConrad Meyer 
116*f7cd7fe5SConrad Meyer static char* createOutFilename_orDie(const char* filename)
117*f7cd7fe5SConrad Meyer {
118*f7cd7fe5SConrad Meyer     size_t const inL = strlen(filename);
119*f7cd7fe5SConrad Meyer     size_t const outL = inL + 5;
120*f7cd7fe5SConrad Meyer     void* const outSpace = malloc_orDie(outL);
121*f7cd7fe5SConrad Meyer     memset(outSpace, 0, outL);
122*f7cd7fe5SConrad Meyer     strcat(outSpace, filename);
123*f7cd7fe5SConrad Meyer     strcat(outSpace, ".zst");
124*f7cd7fe5SConrad Meyer     return (char*)outSpace;
125*f7cd7fe5SConrad Meyer }
126*f7cd7fe5SConrad Meyer 
127*f7cd7fe5SConrad Meyer int main(int argc, const char** argv)
128*f7cd7fe5SConrad Meyer {
129*f7cd7fe5SConrad Meyer     const char* const exeName = argv[0];
130*f7cd7fe5SConrad Meyer 
131*f7cd7fe5SConrad Meyer     if (argc<=3) {
132*f7cd7fe5SConrad Meyer         printf("wrong arguments\n");
133*f7cd7fe5SConrad Meyer         printf("usage:\n");
134*f7cd7fe5SConrad Meyer         printf("%s POOL_SIZE LEVEL FILES\n", exeName);
135*f7cd7fe5SConrad Meyer         return 1;
136*f7cd7fe5SConrad Meyer     }
137*f7cd7fe5SConrad Meyer 
138*f7cd7fe5SConrad Meyer     int pool_size = atoi (argv[1]);
139*f7cd7fe5SConrad Meyer     CHECK(pool_size != 0, "can't parse POOL_SIZE!");
140*f7cd7fe5SConrad Meyer 
141*f7cd7fe5SConrad Meyer     int level = atoi (argv[2]);
142*f7cd7fe5SConrad Meyer     CHECK(level != 0, "can't parse LEVEL!");
143*f7cd7fe5SConrad Meyer 
144*f7cd7fe5SConrad Meyer     argc -= 3;
145*f7cd7fe5SConrad Meyer     argv += 3;
146*f7cd7fe5SConrad Meyer 
147*f7cd7fe5SConrad Meyer #if defined(ZSTD_STATIC_LINKING_ONLY)
148*f7cd7fe5SConrad Meyer     ZSTD_threadPool *pool = ZSTD_createThreadPool (pool_size);
149*f7cd7fe5SConrad Meyer     CHECK(pool != NULL, "ZSTD_createThreadPool() failed!");
150*f7cd7fe5SConrad Meyer     fprintf (stderr, "Using shared thread pool of size %d\n", pool_size);
151*f7cd7fe5SConrad Meyer #else
152*f7cd7fe5SConrad Meyer     fprintf (stderr, "All threads use its own thread pool\n");
153*f7cd7fe5SConrad Meyer #endif
154*f7cd7fe5SConrad Meyer 
155*f7cd7fe5SConrad Meyer     pthread_t *threads = malloc_orDie(argc * sizeof(pthread_t));
156*f7cd7fe5SConrad Meyer     compress_args_t *args = malloc_orDie(argc * sizeof(compress_args_t));
157*f7cd7fe5SConrad Meyer 
158*f7cd7fe5SConrad Meyer     for (unsigned i = 0; i < argc; i++)
159*f7cd7fe5SConrad Meyer     {
160*f7cd7fe5SConrad Meyer       args[i].fname = argv[i];
161*f7cd7fe5SConrad Meyer       args[i].outName = createOutFilename_orDie(args[i].fname);
162*f7cd7fe5SConrad Meyer       args[i].cLevel = level;
163*f7cd7fe5SConrad Meyer #if defined(ZSTD_STATIC_LINKING_ONLY)
164*f7cd7fe5SConrad Meyer       args[i].pool = pool;
165*f7cd7fe5SConrad Meyer #endif
166*f7cd7fe5SConrad Meyer 
167*f7cd7fe5SConrad Meyer       pthread_create (&threads[i], NULL, compressFile_orDie, &args[i]);
168*f7cd7fe5SConrad Meyer     }
169*f7cd7fe5SConrad Meyer 
170*f7cd7fe5SConrad Meyer     for (unsigned i = 0; i < argc; i++)
171*f7cd7fe5SConrad Meyer       pthread_join (threads[i], NULL);
172*f7cd7fe5SConrad Meyer 
173*f7cd7fe5SConrad Meyer #if defined(ZSTD_STATIC_LINKING_ONLY)
174*f7cd7fe5SConrad Meyer     ZSTD_freeThreadPool (pool);
175*f7cd7fe5SConrad Meyer #endif
176*f7cd7fe5SConrad Meyer 
177*f7cd7fe5SConrad Meyer     return 0;
178*f7cd7fe5SConrad Meyer }
179