File size: 8,427 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
#include <limits>
#include <memory>
#include <optional>
#include <string>

#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/type_fwd.h"
#include "arrow/util/visibility.h"

namespace arrow {
namespace util {

constexpr int kUseDefaultCompressionLevel = std::numeric_limits<int>::min();

/// \brief Streaming compressor interface
///
class ARROW_EXPORT Compressor {
 public:
  virtual ~Compressor() = default;

  struct CompressResult {
    int64_t bytes_read;
    int64_t bytes_written;
  };
  struct FlushResult {
    int64_t bytes_written;
    bool should_retry;
  };
  struct EndResult {
    int64_t bytes_written;
    bool should_retry;
  };

  /// \brief Compress some input.
  ///
  /// If bytes_read is 0 on return, then a larger output buffer should be supplied.
  virtual Result<CompressResult> Compress(int64_t input_len, const uint8_t* input,
                                          int64_t output_len, uint8_t* output) = 0;

  /// \brief Flush part of the compressed output.
  ///
  /// If should_retry is true on return, Flush() should be called again
  /// with a larger buffer.
  virtual Result<FlushResult> Flush(int64_t output_len, uint8_t* output) = 0;

  /// \brief End compressing, doing whatever is necessary to end the stream.
  ///
  /// If should_retry is true on return, End() should be called again
  /// with a larger buffer.  Otherwise, the Compressor should not be used anymore.
  ///
  /// End() implies Flush().
  virtual Result<EndResult> End(int64_t output_len, uint8_t* output) = 0;

  // XXX add methods for buffer size heuristics?
};

/// \brief Streaming decompressor interface
///
class ARROW_EXPORT Decompressor {
 public:
  virtual ~Decompressor() = default;

  struct DecompressResult {
    // XXX is need_more_output necessary? (Brotli?)
    int64_t bytes_read;
    int64_t bytes_written;
    bool need_more_output;
  };

  /// \brief Decompress some input.
  ///
  /// If need_more_output is true on return, a larger output buffer needs
  /// to be supplied.
  virtual Result<DecompressResult> Decompress(int64_t input_len, const uint8_t* input,
                                              int64_t output_len, uint8_t* output) = 0;

  /// \brief Return whether the compressed stream is finished.
  ///
  /// This is a heuristic.  If true is returned, then it is guaranteed
  /// that the stream is finished.  If false is returned, however, it may
  /// simply be that the underlying library isn't able to provide the information.
  virtual bool IsFinished() = 0;

  /// \brief Reinitialize decompressor, making it ready for a new compressed stream.
  virtual Status Reset() = 0;

  // XXX add methods for buffer size heuristics?
};

/// \brief Compression codec options
class ARROW_EXPORT CodecOptions {
 public:
  explicit CodecOptions(int compression_level = kUseDefaultCompressionLevel)
      : compression_level(compression_level) {}

  virtual ~CodecOptions() = default;

  int compression_level;
};

// ----------------------------------------------------------------------
// GZip codec options implementation

enum class GZipFormat {
  ZLIB,
  DEFLATE,
  GZIP,
};

class ARROW_EXPORT GZipCodecOptions : public CodecOptions {
 public:
  GZipFormat gzip_format = GZipFormat::GZIP;
  std::optional<int> window_bits;
};

// ----------------------------------------------------------------------
// brotli codec options implementation

class ARROW_EXPORT BrotliCodecOptions : public CodecOptions {
 public:
  std::optional<int> window_bits;
};

/// \brief Compression codec
class ARROW_EXPORT Codec {
 public:
  virtual ~Codec() = default;

  /// \brief Return special value to indicate that a codec implementation
  /// should use its default compression level
  static int UseDefaultCompressionLevel();

  /// \brief Return a string name for compression type
  static const std::string& GetCodecAsString(Compression::type t);

  /// \brief Return compression type for name (all lower case)
  static Result<Compression::type> GetCompressionType(const std::string& name);

  /// \brief Create a codec for the given compression algorithm with CodecOptions
  static Result<std::unique_ptr<Codec>> Create(
      Compression::type codec, const CodecOptions& codec_options = CodecOptions{});

  /// \brief Create a codec for the given compression algorithm
  static Result<std::unique_ptr<Codec>> Create(Compression::type codec,
                                               int compression_level);

  /// \brief Return true if support for indicated codec has been enabled
  static bool IsAvailable(Compression::type codec);

  /// \brief Return true if indicated codec supports setting a compression level
  static bool SupportsCompressionLevel(Compression::type codec);

  /// \brief Return the smallest supported compression level for the codec
  /// Note: This function creates a temporary Codec instance
  static Result<int> MinimumCompressionLevel(Compression::type codec);

  /// \brief Return the largest supported compression level for the codec
  /// Note: This function creates a temporary Codec instance
  static Result<int> MaximumCompressionLevel(Compression::type codec);

  /// \brief Return the default compression level
  /// Note: This function creates a temporary Codec instance
  static Result<int> DefaultCompressionLevel(Compression::type codec);

  /// \brief Return the smallest supported compression level
  virtual int minimum_compression_level() const = 0;

  /// \brief Return the largest supported compression level
  virtual int maximum_compression_level() const = 0;

  /// \brief Return the default compression level
  virtual int default_compression_level() const = 0;

  /// \brief One-shot decompression function
  ///
  /// output_buffer_len must be correct and therefore be obtained in advance.
  /// The actual decompressed length is returned.
  ///
  /// \note One-shot decompression is not always compatible with streaming
  /// compression.  Depending on the codec (e.g. LZ4), different formats may
  /// be used.
  virtual Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
                                     int64_t output_buffer_len,
                                     uint8_t* output_buffer) = 0;

  /// \brief One-shot compression function
  ///
  /// output_buffer_len must first have been computed using MaxCompressedLen().
  /// The actual compressed length is returned.
  ///
  /// \note One-shot compression is not always compatible with streaming
  /// decompression.  Depending on the codec (e.g. LZ4), different formats may
  /// be used.
  virtual Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
                                   int64_t output_buffer_len, uint8_t* output_buffer) = 0;

  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0;

  /// \brief Create a streaming compressor instance
  virtual Result<std::shared_ptr<Compressor>> MakeCompressor() = 0;

  /// \brief Create a streaming compressor instance
  virtual Result<std::shared_ptr<Decompressor>> MakeDecompressor() = 0;

  /// \brief This Codec's compression type
  virtual Compression::type compression_type() const = 0;

  /// \brief The name of this Codec's compression type
  const std::string& name() const { return GetCodecAsString(compression_type()); }

  /// \brief This Codec's compression level, if applicable
  virtual int compression_level() const { return UseDefaultCompressionLevel(); }

 private:
  /// \brief Initializes the codec's resources.
  virtual Status Init();
};

}  // namespace util
}  // namespace arrow