xref: /netbsd-src/external/bsd/zstd/dist/contrib/pzstd/Pzstd.h (revision 3117ece4fc4a4ca4489ba793710b60b0d26bab6c)
1*3117ece4Schristos /*
2*3117ece4Schristos  * Copyright (c) Meta Platforms, Inc. and affiliates.
3*3117ece4Schristos  * All rights reserved.
4*3117ece4Schristos  *
5*3117ece4Schristos  * This source code is licensed under both the BSD-style license (found in the
6*3117ece4Schristos  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7*3117ece4Schristos  * in the COPYING file in the root directory of this source tree).
8*3117ece4Schristos  */
9*3117ece4Schristos #pragma once
10*3117ece4Schristos 
11*3117ece4Schristos #include "ErrorHolder.h"
12*3117ece4Schristos #include "Logging.h"
13*3117ece4Schristos #include "Options.h"
14*3117ece4Schristos #include "utils/Buffer.h"
15*3117ece4Schristos #include "utils/Range.h"
16*3117ece4Schristos #include "utils/ResourcePool.h"
17*3117ece4Schristos #include "utils/ThreadPool.h"
18*3117ece4Schristos #include "utils/WorkQueue.h"
19*3117ece4Schristos #define ZSTD_STATIC_LINKING_ONLY
20*3117ece4Schristos #define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated
21*3117ece4Schristos                                          * and uses deprecated functions
22*3117ece4Schristos                                          */
23*3117ece4Schristos #include "zstd.h"
24*3117ece4Schristos #undef ZSTD_STATIC_LINKING_ONLY
25*3117ece4Schristos 
26*3117ece4Schristos #include <cstddef>
27*3117ece4Schristos #include <cstdint>
28*3117ece4Schristos #include <memory>
29*3117ece4Schristos 
30*3117ece4Schristos namespace pzstd {
31*3117ece4Schristos /**
32*3117ece4Schristos  * Runs pzstd with `options` and returns the number of bytes written.
33*3117ece4Schristos  * An error occurred if `errorHandler.hasError()`.
34*3117ece4Schristos  *
35*3117ece4Schristos  * @param options      The pzstd options to use for (de)compression
36*3117ece4Schristos  * @returns            0 upon success and non-zero on failure.
37*3117ece4Schristos  */
38*3117ece4Schristos int pzstdMain(const Options& options);
39*3117ece4Schristos 
40*3117ece4Schristos class SharedState {
41*3117ece4Schristos  public:
42*3117ece4Schristos   SharedState(const Options& options) : log(options.verbosity) {
43*3117ece4Schristos     if (!options.decompress) {
44*3117ece4Schristos       auto parameters = options.determineParameters();
45*3117ece4Schristos       cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
46*3117ece4Schristos           [this, parameters]() -> ZSTD_CStream* {
47*3117ece4Schristos             this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream");
48*3117ece4Schristos             auto zcs = ZSTD_createCStream();
49*3117ece4Schristos             if (zcs) {
50*3117ece4Schristos               auto err = ZSTD_initCStream_advanced(
51*3117ece4Schristos                   zcs, nullptr, 0, parameters, 0);
52*3117ece4Schristos               if (ZSTD_isError(err)) {
53*3117ece4Schristos                 ZSTD_freeCStream(zcs);
54*3117ece4Schristos                 return nullptr;
55*3117ece4Schristos               }
56*3117ece4Schristos             }
57*3117ece4Schristos             return zcs;
58*3117ece4Schristos           },
59*3117ece4Schristos           [](ZSTD_CStream *zcs) {
60*3117ece4Schristos             ZSTD_freeCStream(zcs);
61*3117ece4Schristos           }});
62*3117ece4Schristos     } else {
63*3117ece4Schristos       dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
64*3117ece4Schristos           [this]() -> ZSTD_DStream* {
65*3117ece4Schristos             this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream");
66*3117ece4Schristos             auto zds = ZSTD_createDStream();
67*3117ece4Schristos             if (zds) {
68*3117ece4Schristos               auto err = ZSTD_initDStream(zds);
69*3117ece4Schristos               if (ZSTD_isError(err)) {
70*3117ece4Schristos                 ZSTD_freeDStream(zds);
71*3117ece4Schristos                 return nullptr;
72*3117ece4Schristos               }
73*3117ece4Schristos             }
74*3117ece4Schristos             return zds;
75*3117ece4Schristos           },
76*3117ece4Schristos           [](ZSTD_DStream *zds) {
77*3117ece4Schristos             ZSTD_freeDStream(zds);
78*3117ece4Schristos           }});
79*3117ece4Schristos     }
80*3117ece4Schristos   }
81*3117ece4Schristos 
82*3117ece4Schristos   ~SharedState() {
83*3117ece4Schristos     // The resource pools have references to this, so destroy them first.
84*3117ece4Schristos     cStreamPool.reset();
85*3117ece4Schristos     dStreamPool.reset();
86*3117ece4Schristos   }
87*3117ece4Schristos 
88*3117ece4Schristos   Logger log;
89*3117ece4Schristos   ErrorHolder errorHolder;
90*3117ece4Schristos   std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
91*3117ece4Schristos   std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
92*3117ece4Schristos };
93*3117ece4Schristos 
94*3117ece4Schristos /**
95*3117ece4Schristos  * Streams input from `fd`, breaks input up into chunks, and compresses each
96*3117ece4Schristos  * chunk independently.  Output of each chunk gets streamed to a queue, and
97*3117ece4Schristos  * the output queues get put into `chunks` in order.
98*3117ece4Schristos  *
99*3117ece4Schristos  * @param state        The shared state
100*3117ece4Schristos  * @param chunks       Each compression jobs output queue gets `pushed()` here
101*3117ece4Schristos  *                      as soon as it is available
102*3117ece4Schristos  * @param executor     The thread pool to run compression jobs in
103*3117ece4Schristos  * @param fd           The input file descriptor
104*3117ece4Schristos  * @param size         The size of the input file if known, 0 otherwise
105*3117ece4Schristos  * @param numThreads   The number of threads in the thread pool
106*3117ece4Schristos  * @param parameters   The zstd parameters to use for compression
107*3117ece4Schristos  * @returns            The number of bytes read from the file
108*3117ece4Schristos  */
109*3117ece4Schristos std::uint64_t asyncCompressChunks(
110*3117ece4Schristos     SharedState& state,
111*3117ece4Schristos     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
112*3117ece4Schristos     ThreadPool& executor,
113*3117ece4Schristos     FILE* fd,
114*3117ece4Schristos     std::uintmax_t size,
115*3117ece4Schristos     std::size_t numThreads,
116*3117ece4Schristos     ZSTD_parameters parameters);
117*3117ece4Schristos 
118*3117ece4Schristos /**
119*3117ece4Schristos  * Streams input from `fd`.  If pzstd headers are available it breaks the input
120*3117ece4Schristos  * up into independent frames.  It sends each frame to an independent
121*3117ece4Schristos  * decompression job.  Output of each frame gets streamed to a queue, and
122*3117ece4Schristos  * the output queues get put into `frames` in order.
123*3117ece4Schristos  *
124*3117ece4Schristos  * @param state        The shared state
125*3117ece4Schristos  * @param frames       Each decompression jobs output queue gets `pushed()` here
126*3117ece4Schristos  *                      as soon as it is available
127*3117ece4Schristos  * @param executor     The thread pool to run compression jobs in
128*3117ece4Schristos  * @param fd           The input file descriptor
129*3117ece4Schristos  * @returns            The number of bytes read from the file
130*3117ece4Schristos  */
131*3117ece4Schristos std::uint64_t asyncDecompressFrames(
132*3117ece4Schristos     SharedState& state,
133*3117ece4Schristos     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
134*3117ece4Schristos     ThreadPool& executor,
135*3117ece4Schristos     FILE* fd);
136*3117ece4Schristos 
137*3117ece4Schristos /**
138*3117ece4Schristos  * Streams input in from each queue in `outs` in order, and writes the data to
139*3117ece4Schristos  * `outputFd`.
140*3117ece4Schristos  *
141*3117ece4Schristos  * @param state        The shared state
142*3117ece4Schristos  * @param outs         A queue of output queues, one for each
143*3117ece4Schristos  *                      (de)compression job.
144*3117ece4Schristos  * @param outputFd     The file descriptor to write to
145*3117ece4Schristos  * @param decompress   Are we decompressing?
146*3117ece4Schristos  * @returns            The number of bytes written
147*3117ece4Schristos  */
148*3117ece4Schristos std::uint64_t writeFile(
149*3117ece4Schristos     SharedState& state,
150*3117ece4Schristos     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
151*3117ece4Schristos     FILE* outputFd,
152*3117ece4Schristos     bool decompress);
153*3117ece4Schristos }
154