1*3117ece4Schristos /* 2*3117ece4Schristos * Copyright (c) Meta Platforms, Inc. and affiliates. 3*3117ece4Schristos * All rights reserved. 4*3117ece4Schristos * 5*3117ece4Schristos * This source code is licensed under both the BSD-style license (found in the 6*3117ece4Schristos * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7*3117ece4Schristos * in the COPYING file in the root directory of this source tree). 8*3117ece4Schristos */ 9*3117ece4Schristos #pragma once 10*3117ece4Schristos 11*3117ece4Schristos #include "ErrorHolder.h" 12*3117ece4Schristos #include "Logging.h" 13*3117ece4Schristos #include "Options.h" 14*3117ece4Schristos #include "utils/Buffer.h" 15*3117ece4Schristos #include "utils/Range.h" 16*3117ece4Schristos #include "utils/ResourcePool.h" 17*3117ece4Schristos #include "utils/ThreadPool.h" 18*3117ece4Schristos #include "utils/WorkQueue.h" 19*3117ece4Schristos #define ZSTD_STATIC_LINKING_ONLY 20*3117ece4Schristos #define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated 21*3117ece4Schristos * and uses deprecated functions 22*3117ece4Schristos */ 23*3117ece4Schristos #include "zstd.h" 24*3117ece4Schristos #undef ZSTD_STATIC_LINKING_ONLY 25*3117ece4Schristos 26*3117ece4Schristos #include <cstddef> 27*3117ece4Schristos #include <cstdint> 28*3117ece4Schristos #include <memory> 29*3117ece4Schristos 30*3117ece4Schristos namespace pzstd { 31*3117ece4Schristos /** 32*3117ece4Schristos * Runs pzstd with `options` and returns the number of bytes written. 33*3117ece4Schristos * An error occurred if `errorHandler.hasError()`. 34*3117ece4Schristos * 35*3117ece4Schristos * @param options The pzstd options to use for (de)compression 36*3117ece4Schristos * @returns 0 upon success and non-zero on failure. 37*3117ece4Schristos */ 38*3117ece4Schristos int pzstdMain(const Options& options); 39*3117ece4Schristos 40*3117ece4Schristos class SharedState { 41*3117ece4Schristos public: 42*3117ece4Schristos SharedState(const Options& options) : log(options.verbosity) { 43*3117ece4Schristos if (!options.decompress) { 44*3117ece4Schristos auto parameters = options.determineParameters(); 45*3117ece4Schristos cStreamPool.reset(new ResourcePool<ZSTD_CStream>{ 46*3117ece4Schristos [this, parameters]() -> ZSTD_CStream* { 47*3117ece4Schristos this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream"); 48*3117ece4Schristos auto zcs = ZSTD_createCStream(); 49*3117ece4Schristos if (zcs) { 50*3117ece4Schristos auto err = ZSTD_initCStream_advanced( 51*3117ece4Schristos zcs, nullptr, 0, parameters, 0); 52*3117ece4Schristos if (ZSTD_isError(err)) { 53*3117ece4Schristos ZSTD_freeCStream(zcs); 54*3117ece4Schristos return nullptr; 55*3117ece4Schristos } 56*3117ece4Schristos } 57*3117ece4Schristos return zcs; 58*3117ece4Schristos }, 59*3117ece4Schristos [](ZSTD_CStream *zcs) { 60*3117ece4Schristos ZSTD_freeCStream(zcs); 61*3117ece4Schristos }}); 62*3117ece4Schristos } else { 63*3117ece4Schristos dStreamPool.reset(new ResourcePool<ZSTD_DStream>{ 64*3117ece4Schristos [this]() -> ZSTD_DStream* { 65*3117ece4Schristos this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream"); 66*3117ece4Schristos auto zds = ZSTD_createDStream(); 67*3117ece4Schristos if (zds) { 68*3117ece4Schristos auto err = ZSTD_initDStream(zds); 69*3117ece4Schristos if (ZSTD_isError(err)) { 70*3117ece4Schristos ZSTD_freeDStream(zds); 71*3117ece4Schristos return nullptr; 72*3117ece4Schristos } 73*3117ece4Schristos } 74*3117ece4Schristos return zds; 75*3117ece4Schristos }, 76*3117ece4Schristos [](ZSTD_DStream *zds) { 77*3117ece4Schristos ZSTD_freeDStream(zds); 78*3117ece4Schristos }}); 79*3117ece4Schristos } 80*3117ece4Schristos } 81*3117ece4Schristos 82*3117ece4Schristos ~SharedState() { 83*3117ece4Schristos // The resource pools have references to this, so destroy them first. 84*3117ece4Schristos cStreamPool.reset(); 85*3117ece4Schristos dStreamPool.reset(); 86*3117ece4Schristos } 87*3117ece4Schristos 88*3117ece4Schristos Logger log; 89*3117ece4Schristos ErrorHolder errorHolder; 90*3117ece4Schristos std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool; 91*3117ece4Schristos std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool; 92*3117ece4Schristos }; 93*3117ece4Schristos 94*3117ece4Schristos /** 95*3117ece4Schristos * Streams input from `fd`, breaks input up into chunks, and compresses each 96*3117ece4Schristos * chunk independently. Output of each chunk gets streamed to a queue, and 97*3117ece4Schristos * the output queues get put into `chunks` in order. 98*3117ece4Schristos * 99*3117ece4Schristos * @param state The shared state 100*3117ece4Schristos * @param chunks Each compression jobs output queue gets `pushed()` here 101*3117ece4Schristos * as soon as it is available 102*3117ece4Schristos * @param executor The thread pool to run compression jobs in 103*3117ece4Schristos * @param fd The input file descriptor 104*3117ece4Schristos * @param size The size of the input file if known, 0 otherwise 105*3117ece4Schristos * @param numThreads The number of threads in the thread pool 106*3117ece4Schristos * @param parameters The zstd parameters to use for compression 107*3117ece4Schristos * @returns The number of bytes read from the file 108*3117ece4Schristos */ 109*3117ece4Schristos std::uint64_t asyncCompressChunks( 110*3117ece4Schristos SharedState& state, 111*3117ece4Schristos WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, 112*3117ece4Schristos ThreadPool& executor, 113*3117ece4Schristos FILE* fd, 114*3117ece4Schristos std::uintmax_t size, 115*3117ece4Schristos std::size_t numThreads, 116*3117ece4Schristos ZSTD_parameters parameters); 117*3117ece4Schristos 118*3117ece4Schristos /** 119*3117ece4Schristos * Streams input from `fd`. If pzstd headers are available it breaks the input 120*3117ece4Schristos * up into independent frames. It sends each frame to an independent 121*3117ece4Schristos * decompression job. Output of each frame gets streamed to a queue, and 122*3117ece4Schristos * the output queues get put into `frames` in order. 123*3117ece4Schristos * 124*3117ece4Schristos * @param state The shared state 125*3117ece4Schristos * @param frames Each decompression jobs output queue gets `pushed()` here 126*3117ece4Schristos * as soon as it is available 127*3117ece4Schristos * @param executor The thread pool to run compression jobs in 128*3117ece4Schristos * @param fd The input file descriptor 129*3117ece4Schristos * @returns The number of bytes read from the file 130*3117ece4Schristos */ 131*3117ece4Schristos std::uint64_t asyncDecompressFrames( 132*3117ece4Schristos SharedState& state, 133*3117ece4Schristos WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, 134*3117ece4Schristos ThreadPool& executor, 135*3117ece4Schristos FILE* fd); 136*3117ece4Schristos 137*3117ece4Schristos /** 138*3117ece4Schristos * Streams input in from each queue in `outs` in order, and writes the data to 139*3117ece4Schristos * `outputFd`. 140*3117ece4Schristos * 141*3117ece4Schristos * @param state The shared state 142*3117ece4Schristos * @param outs A queue of output queues, one for each 143*3117ece4Schristos * (de)compression job. 144*3117ece4Schristos * @param outputFd The file descriptor to write to 145*3117ece4Schristos * @param decompress Are we decompressing? 146*3117ece4Schristos * @returns The number of bytes written 147*3117ece4Schristos */ 148*3117ece4Schristos std::uint64_t writeFile( 149*3117ece4Schristos SharedState& state, 150*3117ece4Schristos WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, 151*3117ece4Schristos FILE* outputFd, 152*3117ece4Schristos bool decompress); 153*3117ece4Schristos } 154