• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2075

27 Feb 2024 04:12PM UTC coverage: 90.97% (+0.05%) from 90.925%
2075

push

Evergreen

web-flow
Eliminate copies when accessing values from Bson types (#7377)

Returning things by value performs a deep copy, which is very expensive when
those things are also bson containers.

Re-align the naming with the convention names for the functions rather than
being weird and different.

93914 of 173104 branches covered (54.25%)

82 of 82 new or added lines in 9 files covered. (100.0%)

66 existing lines in 16 files now uncovered.

238508 of 262184 relevant lines covered (90.97%)

5724419.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.59
/src/realm/util/compression.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/util/compression.hpp>
20
#include <realm/util/safe_int_ops.hpp>
21
#include <realm/util/scope_exit.hpp>
22

23
#include <cstring>
24
#include <limits>
25
#include <map>
26
#include <zlib.h>
27
#include <zconf.h> // for zlib
28

29
#if REALM_USE_LIBCOMPRESSION
30
#include <compression.h>
31
#include <os/availability.h>
32
#endif
33

34
using namespace realm;
35
using namespace util;
36

37
namespace {
38

39
enum class Algorithm {
40
    None = 0,
41
    Deflate = 1,
42
    Lzfse = 2,
43
};
44

45
using stream_avail_size_t = std::conditional_t<sizeof(uInt) < sizeof(size_t), uInt, size_t>;
46
constexpr stream_avail_size_t g_max_stream_avail = std::numeric_limits<stream_avail_size_t>::max();
47

48
stream_avail_size_t bounded_avail(size_t s)
49
{
100,602✔
50
    return s > g_max_stream_avail ? g_max_stream_avail : stream_avail_size_t(s);
100,602✔
51
}
100,602✔
52

53
Bytef* to_bytef(const char* str)
54
{
167,040✔
55
    return reinterpret_cast<Bytef*>(const_cast<char*>(str));
167,040✔
56
}
167,040✔
57

58
class ErrorCategoryImpl : public std::error_category {
59
public:
60
    const char* name() const noexcept override final
61
    {
×
62
        return "realm::util::compression::error";
×
63
    }
×
64
    std::string message(int err) const override final
65
    {
×
66
        using error = realm::util::compression::error;
×
67
        error e = error(err);
×
68
        switch (e) {
×
69
            case error::out_of_memory:
×
70
                return "Out of memory";
×
71
            case error::compress_buffer_too_small:
×
72
                return "Compression buffer too small";
×
73
            case error::compress_error:
×
74
                return "Compression error";
×
75
            case error::compress_input_too_long:
×
76
                return "Compression input too long";
×
77
            case error::corrupt_input:
×
78
                return "Corrupt input data";
×
79
            case error::incorrect_decompressed_size:
×
80
                return "Decompressed data size not equal to expected size";
×
81
            case error::decompress_error:
×
82
                return "Decompression error";
×
83
            case error::decompress_unsupported:
×
84
                return "Decompression failed due to unsupported input compression";
×
85
        }
×
86
        REALM_UNREACHABLE();
87
    }
×
88
};
89

90
ErrorCategoryImpl g_error_category;
91

92
void* custom_alloc(void* opaque, unsigned int cnt, unsigned int size)
93
{
118,350✔
94
    using Alloc = realm::util::compression::Alloc;
118,350✔
95
    Alloc& alloc = *static_cast<Alloc*>(opaque);
118,350✔
96
    std::size_t accum_size = cnt * std::size_t(size);
118,350✔
97
    return alloc.alloc(accum_size);
118,350✔
98
}
118,350✔
99

100
void custom_free(void* opaque, void* addr)
101
{
118,350✔
102
    using Alloc = realm::util::compression::Alloc;
118,350✔
103
    Alloc& alloc = *static_cast<Alloc*>(opaque);
118,350✔
104
    return alloc.free(addr);
118,350✔
105
}
118,350✔
106

107
void init_arena(compression::CompressMemoryArena& compress_memory_arena)
108
{
51,540✔
109
    if (compress_memory_arena.size() == 0) {
51,540✔
110
        // Zlib documentation says that with default settings deflate requires
8,358✔
111
        // at most 268 KB. We round up slightly.
8,358✔
112
        compress_memory_arena.resize(270 * 1024); // Throws
17,814✔
113
    }
17,814✔
114
    else {
33,726✔
115
        compress_memory_arena.reset();
33,726✔
116
    }
33,726✔
117
}
51,540✔
118

119
void grow_arena(compression::CompressMemoryArena& compress_memory_arena)
120
{
18,528✔
121
    std::size_t n = compress_memory_arena.size();
18,528✔
122
    REALM_ASSERT(n != 0);
18,528✔
123
    REALM_ASSERT(n != std::numeric_limits<std::size_t>::max());
18,528✔
124
    if (util::int_multiply_with_overflow_detect(n, 2))
18,528✔
125
        n = std::numeric_limits<std::size_t>::max();
×
126
    compress_memory_arena.resize(n); // Throws
18,528✔
127
}
18,528✔
128

129
uint8_t read_byte(InputStream& is, Span<const char>& buf)
130
{
677,925✔
131
    if (!buf.size())
677,925✔
132
        buf = is.next_block();
33,954✔
133
    if (buf.size()) {
677,925✔
134
        char c = buf.front();
644,016✔
135
        buf = buf.sub_span(1);
644,016✔
136
        return c;
644,016✔
137
    }
644,016✔
138
    return 0;
33,909✔
139
}
33,909✔
140

141
struct Header {
142
    Algorithm algorithm;
143
    size_t size;
144
};
145

146
Header read_header(InputStream& is, Span<const char>& buf)
147
{
332,961✔
148
    Header ret = {};
332,961✔
149
    auto first_byte = read_byte(is, buf);
332,961✔
150
    ret.algorithm = Algorithm(first_byte >> 4);
332,961✔
151
    size_t size_width = first_byte & 0b1111;
332,961✔
152
    if (size_width > sizeof(size_t))
332,961✔
153
        ret.size = -1;
12✔
154
    else {
332,949✔
155
        for (size_t i = 0; i < size_width; ++i) {
664,086✔
156
            ret.size += size_t(read_byte(is, buf)) << (i * 8);
331,137✔
157
        }
331,137✔
158
    }
332,949✔
159
    return ret;
332,961✔
160
}
332,961✔
161

162
uint8_t header_width(size_t size)
163
{
143,631✔
164
    uint8_t width = 0;
143,631✔
165
    while (size) {
314,235✔
166
        ++width;
170,604✔
167
        size >>= 8;
170,604✔
168
    }
170,604✔
169
    return width + 1;
143,631✔
170
}
143,631✔
171

172
size_t write_header(Header h, Span<char> target)
173
{
153,882✔
174
    uint8_t width = 0;
153,882✔
175
    target[0] = uint8_t(h.algorithm) << 4;
153,882✔
176
    for (size_t sz = h.size; sz; sz >>= 8, ++width) {
345,327✔
177
        target[width + 1] = uint8_t(sz & 0xFF);
191,445✔
178
        ++target[0];
191,445✔
179
    }
191,445✔
180
    return width + 1;
153,882✔
181
}
153,882✔
182

183
// Feed in a zlib header to inflate() for the places we don't store it
184
void inflate_zlib_header(z_stream& strm)
185
{
5,040✔
186
    Bytef out;
5,040✔
187
    strm.avail_in = 2;
5,040✔
188
    strm.next_in = to_bytef("\x78\x5e");
5,040✔
189
    strm.avail_out = sizeof(out);
5,040✔
190
    strm.next_out = &out;
5,040✔
191

5,040✔
192
    int rc = inflate(&strm, Z_SYNC_FLUSH);
5,040✔
193
    REALM_ASSERT(rc == Z_OK);
5,040!
194
    REALM_ASSERT(strm.avail_in == 0);
5,040!
195
}
5,040✔
196

197
struct DecompressInputStreamNone final : public InputStream {
198
    DecompressInputStreamNone(InputStream& s, Span<const char> b)
199
        : source(s)
200
        , current_block(b)
201
    {
92,001✔
202
        if (current_block.empty())
92,001✔
203
            current_block = source.next_block();
537✔
204
    }
92,001✔
205
    InputStream& source;
206
    Span<const char> current_block;
207

208
    Span<const char> next_block() override
209
    {
325,017✔
210
        auto ret = current_block;
325,017✔
211
        if (ret.size())
325,017✔
212
            current_block = source.next_block();
233,022✔
213
        return ret;
325,017✔
214
    }
325,017✔
215
};
216

217
class DecompressInputStreamZlib final : public InputStream {
218
public:
219
    DecompressInputStreamZlib(InputStream& s, Span<const char> b, size_t total_size)
220
        : m_source(s)
221
    {
1,239✔
222
        // Arbitrary upper limit to reduce peak memory usage
1,239✔
223
        constexpr const size_t max_out_buffer_size = 1024 * 1024;
1,239✔
224
        m_buffer.reserve(std::min(total_size, max_out_buffer_size));
1,239✔
225

1,239✔
226
        int rc = inflateInit(&m_strm);
1,239✔
227
        if (rc != Z_OK)
1,239!
228
            throw std::system_error(make_error_code(compression::error::decompress_error), m_strm.msg);
1,239✔
229
        inflate_zlib_header(m_strm);
1,239✔
230

1,239✔
231
        m_strm.avail_in = bounded_avail(b.size());
1,239✔
232
        m_strm.next_in = to_bytef(b.data());
1,239✔
233
        m_current_block = b.sub_span(m_strm.avail_in);
1,239✔
234
    }
1,239✔
235

236
    ~DecompressInputStreamZlib()
237
    {
1,239✔
238
        inflateEnd(&m_strm);
1,239✔
239
    }
1,239✔
240

241
    Span<const char> next_block() override
242
    {
4,218✔
243
        m_buffer.resize(m_buffer.capacity());
4,218✔
244
        m_strm.avail_out = bounded_avail(m_buffer.size());
4,218✔
245
        m_strm.next_out = to_bytef(m_buffer.data());
4,218✔
246

4,218✔
247
        while (true) {
4,614✔
248
            // We may have some leftover input buffer from a previous call if the
4,614✔
249
            // inflated result didn't fit in the output buffer. If not, we need to
4,614✔
250
            // fetch the next block.
4,614✔
251
            if (m_strm.avail_in == 0) {
4,614!
252
                m_current_block = m_source.next_block();
2,325✔
253
                if (m_current_block.size()) {
2,325!
254
                    m_strm.next_in = to_bytef(m_current_block.data());
1,098✔
255
                    m_strm.avail_in = bounded_avail(m_current_block.size());
1,098✔
256
                }
1,098✔
257
            }
2,325✔
258

4,614✔
259
            m_strm.total_out = 0;
4,614✔
260
            auto rc = inflate(&m_strm, m_strm.avail_in ? Z_SYNC_FLUSH : Z_FINISH);
4,614!
261
            REALM_ASSERT(rc == Z_OK || rc == Z_STREAM_END || rc == Z_BUF_ERROR);
4,614!
262

4,614✔
263
            if (m_strm.total_out) {
4,614!
264
                // We got some output, so return that. We might also have reached
2,979✔
265
                // the end of the stream, which'll be reported on the next call
2,979✔
266
                // if so.
2,979✔
267
                REALM_ASSERT(m_strm.total_out <= m_buffer.capacity());
2,979!
268
                m_buffer.resize(m_strm.total_out);
2,979✔
269
                return m_buffer;
2,979✔
270
            }
2,979✔
271

1,635✔
272
            if (rc != Z_OK) {
1,635!
273
                // We reached the end of the stream without producing more data, so
1,239✔
274
                // we're done.
1,239✔
275
                return {nullptr, nullptr};
1,239✔
276
            }
1,239✔
277

1,635✔
278
            // Otherwise we produced no output but also didn't reach the end of the
1,635✔
279
            // stream, so we need to feed more data in.
1,635✔
280
        }
1,635✔
281
    }
4,218✔
282

283
private:
284
    InputStream& m_source;
285
    Span<const char> m_current_block;
286
    z_stream m_strm = {};
287
    AppendBuffer<char> m_buffer;
288
};
289

290
#if REALM_USE_LIBCOMPRESSION
291

292
compression_algorithm algorithm_to_compression_algorithm(Algorithm a)
293
{
12,696✔
294
    switch (Algorithm(a)) {
12,696✔
295
        case Algorithm::Deflate:
6,945✔
296
            return COMPRESSION_ZLIB;
6,945✔
297
        case Algorithm::Lzfse:
5,748✔
298
            return COMPRESSION_LZFSE;
5,748✔
299
        default:
3✔
300
            return (compression_algorithm)0;
3✔
301
    }
12,696✔
302
}
12,696✔
303

304
API_AVAILABLE_BEGIN(macos(10.11))
305

306
class DecompressInputStreamLibCompression final : public InputStream {
307
public:
308
    DecompressInputStreamLibCompression(InputStream& s, Span<const char> b, Header h)
309
        : m_source(s)
310
    {
1,236✔
311
        // Arbitrary upper limit to reduce peak memory usage
312
        constexpr const size_t max_out_buffer_size = 1024 * 1024;
1,236✔
313
        m_buffer.reserve(std::min(h.size, max_out_buffer_size));
1,236✔
314
        auto rc = compression_stream_init(&m_strm, COMPRESSION_STREAM_DECODE,
1,236✔
315
                                          algorithm_to_compression_algorithm(h.algorithm));
1,236✔
316
        if (rc != COMPRESSION_STATUS_OK)
1,236✔
317
            throw std::system_error(compression::error::decompress_error);
318
        m_strm.src_size = b.size();
1,236✔
319
        m_strm.src_ptr = to_bytef(b.data());
1,236✔
320
    }
1,236✔
321

322
    ~DecompressInputStreamLibCompression()
323
    {
1,236✔
324
        compression_stream_destroy(&m_strm);
1,236✔
325
    }
1,236✔
326

327
    Span<const char> next_block() override
328
    {
3,531✔
329
        m_buffer.resize(m_buffer.capacity());
3,531✔
330
        m_strm.dst_size = m_buffer.size();
3,531✔
331
        m_strm.dst_ptr = to_bytef(m_buffer.data());
3,531✔
332

333
        while (true) {
5,013✔
334
            // We may have some leftover input buffer from a previous call if the
335
            // inflated result didn't fit in the output buffer. If not, we need to
336
            // fetch the next block.
337
            bool end = false;
5,013✔
338
            if (m_strm.src_size == 0) {
5,013✔
339
                if (auto block = m_source.next_block(); block.size()) {
3,681✔
340
                    m_strm.src_ptr = to_bytef(block.data());
1,509✔
341
                    m_strm.src_size = block.size();
1,509✔
342
                }
1,509✔
343
                else {
2,172✔
344
                    end = true;
2,172✔
345
                }
2,172✔
346
            }
3,681✔
347

348
            auto rc = compression_stream_process(&m_strm, end ? COMPRESSION_STREAM_FINALIZE : 0);
2,841✔
349
            if (rc == COMPRESSION_STATUS_ERROR)
5,013✔
350
                throw std::system_error(compression::error::corrupt_input);
351
            auto bytes_written = m_buffer.size() - m_strm.dst_size;
5,013✔
352
            if (bytes_written) {
5,013✔
353
                // We got some output, so return that. We might also have reached
354
                // the end of the stream, which'll be reported on the next call
355
                // if so.
356
                m_buffer.resize(bytes_written);
2,295✔
357
                return m_buffer;
2,295✔
358
            }
2,295✔
359
            if (rc == COMPRESSION_STATUS_END) {
2,718✔
360
                // We reached the end of the stream and are done
361
                return {nullptr, nullptr};
1,236✔
362
            }
1,236✔
363

364
            if (end) {
1,482✔
365
                // We ran out of input data but didn't get COMPRESSION_STATUS_END,
366
                // so the input is truncated
367
                throw std::system_error(compression::error::corrupt_input);
368
            }
369

370
            // Otherwise we produced no output but also didn't reach the end of the
371
            // stream, so we need to feed more data in.
372
        }
1,482✔
373
    }
3,531✔
374

375
private:
376
    InputStream& m_source;
377
    compression_stream m_strm = {};
378
    AppendBuffer<char> m_buffer;
379
};
380

381
API_AVAILABLE_END
382
#endif
383

384
std::error_code decompress_none(InputStream& compressed, Span<const char> compressed_buf, Span<char> decompressed_buf)
385
{
56,517✔
386
    do {
56,517✔
387
        auto count = std::min(decompressed_buf.size(), compressed_buf.size());
56,517✔
388
        std::memcpy(decompressed_buf.data(), compressed_buf.data(), count);
56,517✔
389
        decompressed_buf = decompressed_buf.sub_span(count);
56,517✔
390
        compressed_buf = compressed.next_block();
56,517✔
391
    } while (compressed_buf.size() && decompressed_buf.size());
56,517!
392

27,531✔
393
    if (compressed_buf.size() || decompressed_buf.size()) {
56,517✔
394
        return compression::error::incorrect_decompressed_size;
×
395
    }
×
396
    return std::error_code{};
56,517✔
397
}
56,517✔
398

399
std::error_code decompress_zlib(InputStream& compressed, Span<const char> compressed_buf, Span<char> decompressed_buf,
400
                                bool has_header)
401
{
10,323✔
402
    using namespace compression;
10,323✔
403

10,323✔
404
    z_stream strm = {};
10,323✔
405
    int rc = inflateInit(&strm);
10,323✔
406
    if (rc != Z_OK)
10,323!
407
        return error::decompress_error;
10,323✔
408
    util::ScopeExit cleanup([&]() noexcept {
10,323✔
409
        // inflateEnd() only fails if we modified the fields of strm in an invalid way
10,323✔
410
        int rc = inflateEnd(&strm);
10,323✔
411
        REALM_ASSERT(rc == Z_OK);
10,323!
412
        static_cast<void>(rc);
10,323✔
413
    });
10,323✔
414

10,323✔
415
    if (!has_header)
10,323!
416
        inflate_zlib_header(strm);
3,801✔
417

10,323✔
418
    do {
11,097✔
419
        size_t in_offset = 0;
11,097✔
420

11,097✔
421
        // This loop will typically run exactly once. If size_t is larger than
11,097✔
422
        // uInt (as it is on most 64-bit platforms), input or output larger than
11,097✔
423
        // uInt's upper bound will require multiple iterations of passing data
11,097✔
424
        // to zlib.
11,097✔
425
        while (in_offset < compressed_buf.size()) {
11,877!
426
            strm.avail_in = bounded_avail(compressed_buf.size() - in_offset);
11,100✔
427
            strm.next_in = to_bytef(compressed_buf.data() + in_offset);
11,100✔
428
            strm.next_out = to_bytef(decompressed_buf.data());
11,100✔
429
            strm.avail_out = bounded_avail(decompressed_buf.size());
11,100✔
430
            strm.total_in = 0;
11,100✔
431
            strm.total_out = 0;
11,100✔
432

11,100✔
433
            int rc = inflate(&strm, Z_SYNC_FLUSH);
11,100✔
434
            REALM_ASSERT(rc != Z_STREAM_ERROR && rc != Z_MEM_ERROR);
11,100!
435
            in_offset += strm.total_in;
11,100✔
436
            decompressed_buf = decompressed_buf.sub_span(strm.total_out);
11,100✔
437

11,100✔
438
            if (rc == Z_OK) {
11,100!
439
                // We made forward progress but did not reach the end
780✔
440
                continue;
780✔
441
            }
780✔
442
            if (rc == Z_STREAM_END) {
10,320!
443
                // If we got Z_STREAM_END and there's leftover input then the
10,314✔
444
                // data is invalid
10,314✔
445
                if (strm.avail_in || in_offset < compressed_buf.size() || compressed.next_block().size())
10,314!
446
                    return error::corrupt_input;
3✔
447
                if (decompressed_buf.size() != 0)
10,311!
448
                    return error::incorrect_decompressed_size;
3✔
449
                return std::error_code{};
10,308✔
450
            }
10,308✔
451
            if (rc == Z_NEED_DICT) {
6!
452
                // We don't support custom dictionaries
453
                return error::decompress_unsupported;
×
454
            }
×
455
            if (rc == Z_DATA_ERROR) {
6!
456
                return error::corrupt_input;
3✔
457
            }
3✔
458
            if (rc == Z_BUF_ERROR) {
3!
459
                if (strm.avail_out == 0) {
3!
460
                    if (decompressed_buf.size() > 0) {
3!
461
                        // We need to pass in the next range of the decompress buffer
462
                        continue;
×
463
                    }
×
464
                    // We should never run out of output buffer space unless the
3✔
465
                    // decompressed size was wrong.
3✔
466
                    return error::incorrect_decompressed_size;
3✔
467
                }
3✔
468
                // If there's space left in the output buffer then that means
469
                // we ran out of input without getting Z_STREAM_END
470
                return error::corrupt_input;
×
471
            }
×
472

473
            // Unknown error code
474
            REALM_UNREACHABLE();
UNCOV
475
        }
×
476
    } while ((compressed_buf = compressed.next_block()), compressed_buf.size());
11,097!
477

10,323✔
478
    if (strm.avail_in && !strm.avail_out) {
10,323!
479
        // Ran out of output buffer with remaining input
480
        return error::incorrect_decompressed_size;
×
481
    }
×
482

3✔
483
    // We ran out of input without getting Z_STREAM_END
3✔
484
    return error::corrupt_input;
3✔
485
}
3✔
486

487
#if REALM_USE_LIBCOMPRESSION
488
API_AVAILABLE_BEGIN(macos(10.11))
489
std::error_code decompress_libcompression(InputStream& compressed, Span<const char> compressed_buf,
490
                                          Span<char> decompressed_buf, Algorithm algorithm, bool has_header)
491
{
11,460✔
492
    using namespace compression;
11,460✔
493

494
    // If we're given a buffer with a zlib header we have to parse it outselves,
495
    // as libcompression doesn't handle it.
496
    if (has_header) {
11,460✔
497
        // The first nibble is compression algorithm (where 8 is DEFLATE), and second
498
        // nibble is window size. RFC 1950 only allows window size 7, so the first
499
        // byte must be 0x78.
500
        if (read_byte(compressed, compressed_buf) != 0x78)
6,945✔
501
            return error::corrupt_input;
502
        // The second byte has flags. Bit 5 is the only interesting one, which
503
        // indicates if a custom dictionary was used. We don't support that.
504
        uint8_t flags = read_byte(compressed, compressed_buf);
6,945✔
505
        if (flags & 0b100000)
6,945✔
506
            return error::decompress_unsupported;
507
        algorithm = Algorithm::Deflate;
6,945✔
508
    }
6,945✔
509

510
    auto compression_algorithm = algorithm_to_compression_algorithm(algorithm);
11,460✔
511
    if (!compression_algorithm)
11,460✔
512
        return error::decompress_unsupported;
3✔
513

514
    compression_stream strm;
11,457✔
515
    auto rc = compression_stream_init(&strm, COMPRESSION_STREAM_DECODE, compression_algorithm);
11,457✔
516
    if (rc != COMPRESSION_STATUS_OK)
11,457✔
517
        return error::decompress_error;
518

519
    // Using ScopeExit here warns about missing availability checking, but also
520
    // complains about redundant availability checking if it's added.
521
    struct Cleanup {
11,457✔
522
        compression_stream* strm;
11,457✔
523
        ~Cleanup()
11,457✔
524
        {
11,457✔
525
            compression_stream_destroy(strm);
11,457✔
526
        }
11,457✔
527
    } cleanup{&strm};
11,457✔
528

529
    strm.dst_size = decompressed_buf.size();
11,457✔
530
    strm.dst_ptr = to_bytef(decompressed_buf.data());
11,457✔
531

532
    uint32_t expected_checksum = 0;
11,457✔
533
    uLong actual_checksum = 1;
11,457✔
534
    do {
12,228✔
535
        strm.src_size = compressed_buf.size();
12,228✔
536
        strm.src_ptr = to_bytef(compressed_buf.data());
12,228✔
537

538
        // compression_stream_process() only writes 64 KB at a time, and you
539
        // have to call it in a loop until it stops giving more output before
540
        // feeding in more input
541
        while (rc != COMPRESSION_STATUS_END) {
36,543✔
542
            auto dst_ptr_start = strm.dst_ptr;
25,062✔
543
            rc = compression_stream_process(&strm, 0);
25,062✔
544
            if (rc == COMPRESSION_STATUS_ERROR)
25,062✔
545
                return error::corrupt_input;
546
            if (strm.dst_ptr == dst_ptr_start)
25,062✔
547
                break;
747✔
548

549
            // libcompression doesn't check the checksum, so do it manually.
550
            // This loop will never actually run multiple times as in practice
551
            // libcompression doesn't actually write more bytes than fit in uLong
552
            // in a single call to compression_stream_process()
553
            while (dst_ptr_start < strm.dst_ptr) {
48,630✔
554
                auto size = bounded_avail(strm.dst_ptr - dst_ptr_start);
24,315✔
555
                actual_checksum = adler32(actual_checksum, dst_ptr_start, size);
24,315✔
556
                dst_ptr_start += size;
24,315✔
557
            }
24,315✔
558
        }
24,315✔
559

560
        // The checksum at the end can potentially be straddling a block boundary
561
        // and we can't rewind, so maintain a rolling window of the last four
562
        // bytes seen.
563
        for (uint8_t byte : compressed_buf.last(std::min<size_t>(4u, compressed_buf.size()))) {
47,427✔
564
            expected_checksum <<= 8;
47,427✔
565
            expected_checksum += byte;
47,427✔
566
        }
47,427✔
567
    } while ((compressed_buf = compressed.next_block()), compressed_buf.size());
12,228✔
568
    rc = compression_stream_process(&strm, COMPRESSION_STREAM_FINALIZE);
11,457✔
569
    if (rc != COMPRESSION_STATUS_END)
11,457✔
570
        return error::corrupt_input;
6✔
571
    if (strm.dst_size != 0)
11,451✔
572
        return error::incorrect_decompressed_size;
3✔
573
    if (expected_checksum != actual_checksum)
11,448✔
574
        return error::corrupt_input;
6✔
575
    // Check for remaining extra input
576
    if (strm.src_size || compressed.next_block().size())
11,442✔
577
        return error::corrupt_input;
578
    return std::error_code{};
11,442✔
579
}
11,442✔
580
API_AVAILABLE_END
581
#endif
582

583
std::error_code decompress(InputStream& compressed, Span<const char> compressed_buf, Span<char> decompressed_buf,
584
                           Algorithm algorithm, bool has_header)
585
{
78,312✔
586
    using namespace compression;
78,312✔
587

37,863✔
588
    if (decompressed_buf.size() == 0) {
78,312✔
589
        return std::error_code{};
12✔
590
    }
12✔
591
    if (!compressed_buf.size()) {
78,300✔
592
        return error::incorrect_decompressed_size;
×
593
    }
×
594

37,857✔
595
#if REALM_USE_LIBCOMPRESSION
40,443✔
596
    // All of our non-macOS deployment targets are high enough to have libcompression,
597
    // but we support some older macOS versions
598
    if (__builtin_available(macOS 10.11, *)) {
40,446✔
599
        if (algorithm != Algorithm::None)
40,446✔
600
            return decompress_libcompression(compressed, compressed_buf, decompressed_buf, algorithm, has_header);
11,460✔
601
    }
28,983✔
602
#endif
28,983✔
603

37,857✔
604
    switch (algorithm) {
66,840✔
605
        case Algorithm::None:
56,517✔
606
            return decompress_none(compressed, compressed_buf, decompressed_buf);
56,517✔
607
        case Algorithm::Deflate:
10,323✔
608
            return decompress_zlib(compressed, compressed_buf, decompressed_buf, has_header);
10,323✔
609
        default:
3✔
610
            return error::decompress_unsupported;
3✔
611
    }
66,840✔
612
}
66,840✔
613

614
#if 0
615
struct CompressionStats {
616
    std::mutex mutex;
617
    std::map<size_t, std::pair<size_t, size_t>> stats;
618
    ~CompressionStats()
619
    {
620
        std::lock_guard lock(mutex);
621
        size_t total_uncompressed = 0;
622
        size_t total_compressed = 0;
623
        for (auto& [size, results] : stats) {
624
            fprintf(stderr, "%zu: %zu %g\n", size, results.first, static_cast<double>(results.second) / results.first / size * 100);
625
            total_uncompressed += size * results.first;
626
            total_compressed += results.second;
627
        }
628
        fprintf(stderr, "total: %zu -> %zu (%g%%)\n", total_uncompressed, total_compressed, (double)total_compressed / total_uncompressed * 100.0);
629
    }
630
} s_compression_stats;
631

632
void record_compression_result(size_t uncompressed, size_t compressed)
633
{
634
    std::lock_guard lock(s_compression_stats.mutex);
635
    auto& arr = s_compression_stats.stats[uncompressed];
636
    arr.first++;
637
    arr.second += compressed;
638
}
639
#else
640
void record_compression_result(size_t, size_t) {}
135,336✔
641
#endif
642

643
#if REALM_USE_LIBCOMPRESSION
644
API_AVAILABLE_BEGIN(macos(10.11))
645
std::error_code compress_lzfse(Span<const char> uncompressed_buf, Span<char> compressed_buf,
646
                               std::size_t& compressed_size, compression::Alloc* custom_allocator)
647
{
27,870✔
648
    using namespace compression;
27,870✔
649
    if (compressed_buf.size() < 4)
27,870✔
650
        return error::compress_buffer_too_small;
651
    // compression_encode_buffer() takes a size_t, but crashes if the value is
652
    // larger than 2^31. Using the stream API works, but it's slower for
653
    // normal-sized input, and we can just fall back to zlib for this edge case.
654
    if (uncompressed_buf.size() > std::numeric_limits<int32_t>::max())
27,870✔
655
        return error::compress_input_too_long;
656

657
    auto uncompressed_ptr = to_bytef(uncompressed_buf.data());
27,870✔
658
    auto uncompressed_size = uncompressed_buf.size();
27,870✔
659
    auto compressed_ptr = to_bytef(compressed_buf.data());
27,870✔
660
    auto compressed_buf_size = compressed_buf.size() - 4;
27,870✔
661

662
    void* scratch_buffer = nullptr;
27,870✔
663
    if (custom_allocator) {
27,870✔
664
        scratch_buffer = custom_allocator->alloc(compression_encode_scratch_buffer_size(COMPRESSION_LZFSE));
27,870✔
665
        if (!scratch_buffer)
27,870✔
666
            return error::out_of_memory;
18,528✔
667
    }
9,342✔
668

669
    size_t bytes = compression_encode_buffer(compressed_ptr, compressed_buf_size, uncompressed_ptr, uncompressed_size,
9,342✔
670
                                             scratch_buffer, COMPRESSION_LZFSE);
9,342✔
671
    if (bytes == 0)
9,342✔
672
        return error::compress_buffer_too_small;
12✔
673

674
    // Calculate the checksum and append it to the end of the stream
675
    uLong checksum = htonl(adler32(1, uncompressed_ptr, static_cast<uInt>(uncompressed_size)));
9,330✔
676
    for (int i = 0; i < 4; ++i) {
46,650✔
677
        compressed_buf[bytes + i] = checksum & 0xFF;
37,320✔
678
        checksum >>= 8;
37,320✔
679
    }
37,320✔
680
    compressed_size = bytes + 4;
9,330✔
681
    return std::error_code{};
9,330✔
682
}
9,330✔
683
API_AVAILABLE_END
684
#endif
685

686
std::error_code compress_lzfse_or_zlib(Span<const char> uncompressed_buf, Span<char> compressed_buf,
687
                                       std::size_t& compressed_size, int compression_level,
688
                                       compression::Alloc* custom_allocator)
689
{
36,168✔
690
    using namespace compression;
36,168✔
691
#if REALM_USE_LIBCOMPRESSION
27,870✔
692
    if (__builtin_available(macOS 10.11, *)) {
27,870✔
693
        size_t len = write_header({Algorithm::Lzfse, uncompressed_buf.size()}, compressed_buf);
27,870✔
694
        auto ec = compress_lzfse(uncompressed_buf, compressed_buf.sub_span(len), compressed_size, custom_allocator);
27,870✔
695
        if (ec != error::compress_input_too_long)
27,870✔
696
            return ec;
27,870✔
697
    }
698
#endif
699
    size_t len = header_width(uncompressed_buf.size());
8,298✔
700
    REALM_ASSERT(len >= 2);
8,298!
701
    auto ec = compress(uncompressed_buf, compressed_buf.sub_span(len - 2), compressed_size, compression_level,
8,298✔
702
                       custom_allocator);
8,298✔
703
    if (!ec) {
8,298!
704
        // Note: overwrites zlib header
8,286✔
705
        write_header({Algorithm::Deflate, uncompressed_buf.size()}, compressed_buf);
8,286✔
706
        compressed_size -= 2;
8,286✔
707
    }
8,286✔
708
    return ec;
8,298✔
709
}
8,298✔
710
} // unnamed namespace
711

712

713
const std::error_category& compression::error_category() noexcept
714
{
×
715
    return g_error_category;
×
716
}
×
717

718
std::error_code compression::make_error_code(error error_code) noexcept
719
{
188,865✔
720
    return std::error_code(int(error_code), g_error_category);
188,865✔
721
}
188,865✔
722

723

724
// zlib compression level: 1-9, 1 fastest.
725

726
// zlib deflateBound()
727
std::size_t compression::compress_bound(std::size_t size) noexcept
728
{
96✔
729
    // DEFLATE's worst-case size is a 6 byte zlib header, plus the uncompressed
48✔
730
    // data, plus a 5 byte header for every 16383 byte block.
48✔
731
    size_t overhead = 6 + 5 * (size / 16383 + 1);
96✔
732
    if (std::numeric_limits<size_t>::max() - overhead < size)
96✔
733
        return 0;
×
734
    return size + overhead;
96✔
735
}
96✔
736

737

738
// zlib deflate()
739
std::error_code compression::compress(Span<const char> uncompressed_buf, Span<char> compressed_buf,
740
                                      std::size_t& compressed_size, int compression_level, Alloc* custom_allocator)
741
{
23,772✔
742
    auto uncompressed_ptr = to_bytef(uncompressed_buf.data());
23,772✔
743
    auto uncompressed_size = uncompressed_buf.size();
23,772✔
744
    auto compressed_ptr = to_bytef(compressed_buf.data());
23,772✔
745
    auto compressed_buf_size = compressed_buf.size();
23,772✔
746

15,621✔
747
    z_stream strm = {};
23,772✔
748
    if (custom_allocator) {
23,772✔
749
        strm.opaque = custom_allocator;
23,670✔
750
        strm.zalloc = &custom_alloc;
23,670✔
751
        strm.zfree = &custom_free;
23,670✔
752
    }
23,670✔
753

15,621✔
754
    int rc = deflateInit(&strm, compression_level);
23,772✔
755
    if (rc == Z_MEM_ERROR)
23,772✔
756
        return error::out_of_memory;
15,621✔
757

15,621✔
758
    if (rc != Z_OK)
23,772✔
759
        return error::compress_error;
15,621✔
760

15,621✔
761
    strm.next_in = uncompressed_ptr;
23,772✔
762
    strm.avail_in = 0;
23,772✔
763
    strm.next_out = compressed_ptr;
23,772✔
764
    strm.avail_out = 0;
23,772✔
765

15,621✔
766
    std::size_t next_in_ndx = 0;
23,772✔
767
    std::size_t next_out_ndx = 0;
23,772✔
768
    REALM_ASSERT(rc == Z_OK);
23,772✔
769
    while (rc == Z_OK || rc == Z_BUF_ERROR) {
49,728✔
770
        REALM_ASSERT(strm.next_in + strm.avail_in == uncompressed_ptr + next_in_ndx);
28,140✔
771
        REALM_ASSERT(strm.next_out + strm.avail_out == compressed_ptr + next_out_ndx);
28,140✔
772

17,415✔
773
        bool stream_updated = false;
28,140✔
774

17,415✔
775
        if (strm.avail_in == 0 && next_in_ndx < uncompressed_size) {
28,140✔
776
            auto in_size = bounded_avail(uncompressed_size - next_in_ndx);
23,760✔
777
            next_in_ndx += in_size;
23,760✔
778
            strm.avail_in = uInt(in_size);
23,760✔
779
            stream_updated = true;
23,760✔
780
        }
23,760✔
781

17,415✔
782
        if (strm.avail_out == 0 && next_out_ndx < compressed_buf_size) {
28,140✔
783
            auto out_size = bounded_avail(compressed_buf_size - next_out_ndx);
23,772✔
784
            next_out_ndx += out_size;
23,772✔
785
            strm.avail_out = uInt(out_size);
23,772✔
786
            stream_updated = true;
23,772✔
787
        }
23,772✔
788

17,415✔
789
        if (rc == Z_BUF_ERROR && !stream_updated) {
28,140✔
790
            deflateEnd(&strm);
2,184✔
791
            return error::compress_buffer_too_small;
2,184✔
792
        }
2,184✔
793

16,518✔
794
        int flush = (next_in_ndx == uncompressed_size) ? Z_FINISH : Z_NO_FLUSH;
25,956✔
795

16,518✔
796
        rc = deflate(&strm, flush);
25,956✔
797
        REALM_ASSERT(rc != Z_STREAM_END || flush == Z_FINISH);
25,956✔
798
    }
25,956✔
799

15,621✔
800
    if (rc != Z_STREAM_END) {
22,485✔
801
        deflateEnd(&strm);
×
802
        return error::compress_error;
×
803
    }
×
804

14,724✔
805
    compressed_size = next_out_ndx - strm.avail_out;
21,588✔
806

14,724✔
807
    rc = deflateEnd(&strm);
21,588✔
808
    if (rc != Z_OK)
21,588✔
809
        return error::compress_error;
14,724✔
810

14,724✔
811
    return std::error_code{};
21,588✔
812
}
21,588✔
813

814
std::error_code compression::decompress(InputStream& compressed, Span<char> decompressed_buf)
815
{
120✔
816
    return ::decompress(compressed, compressed.next_block(), decompressed_buf, Algorithm::Deflate, true);
120✔
817
}
120✔
818

819
std::error_code compression::decompress(Span<const char> compressed_buf, Span<char> decompressed_buf)
820
{
13,359✔
821
    SimpleInputStream adapter(compressed_buf);
13,359✔
822
    return ::decompress(adapter, adapter.next_block(), decompressed_buf, Algorithm::Deflate, true);
13,359✔
823
}
13,359✔
824

825
std::error_code compression::decompress_nonportable(InputStream& compressed, AppendBuffer<char>& decompressed)
826
{
65,787✔
827
    auto compressed_buf = compressed.next_block();
65,787✔
828
    auto header = read_header(compressed, compressed_buf);
65,787✔
829
    if (header.size == std::numeric_limits<size_t>::max())
65,787✔
830
        return error::out_of_memory;
6✔
831
    decompressed.resize(header.size);
65,781✔
832
    if (header.size == 0)
65,781✔
833
        return std::error_code{};
945✔
834
    return ::decompress(compressed, compressed_buf, decompressed, header.algorithm, false);
64,836✔
835
}
64,836✔
836

837
std::error_code compression::allocate_and_compress(CompressMemoryArena& compress_memory_arena,
838
                                                   Span<const char> uncompressed_buf,
839
                                                   std::vector<char>& compressed_buf)
840
{
13,206✔
841
    const int compression_level = 1;
13,206✔
842
    std::size_t compressed_size = 0;
13,206✔
843

6,390✔
844
    if (compressed_buf.size() < 256)
13,206✔
845
        compressed_buf.resize(256); // Throws
12,606✔
846

6,390✔
847
    for (;;) {
15,372✔
848
        init_arena(compress_memory_arena);
15,372✔
849
        std::error_code ec = compression::compress(uncompressed_buf, compressed_buf, compressed_size,
15,372✔
850
                                                   compression_level, &compress_memory_arena);
15,372✔
851

7,272✔
852
        if (REALM_UNLIKELY(ec)) {
15,372✔
853
            if (ec == compression::error::compress_buffer_too_small) {
2,166✔
854
                std::size_t n = compressed_buf.size();
2,166✔
855
                REALM_ASSERT(n != std::numeric_limits<std::size_t>::max());
2,166✔
856
                if (util::int_multiply_with_overflow_detect(n, 2))
2,166✔
857
                    n = std::numeric_limits<std::size_t>::max();
×
858
                compressed_buf.resize(n); // Throws
2,166✔
859
                continue;
2,166✔
860
            }
2,166✔
861
            if (ec == compression::error::out_of_memory) {
×
862
                grow_arena(compress_memory_arena); // Throws
×
863
                continue;
×
864
            }
×
865
            return ec;
×
866
        }
×
867
        break;
13,206✔
868
    }
13,206✔
869
    compressed_buf.resize(compressed_size);
13,206✔
870
    return std::error_code{};
13,206✔
871
}
13,206✔
872

873
void compression::allocate_and_compress_nonportable(CompressMemoryArena& arena, Span<const char> uncompressed,
874
                                                    util::AppendBuffer<char>& compressed)
875
{
201,984✔
876
    if (uncompressed.size() == 0) {
201,984✔
877
        compressed.resize(0);
66,657✔
878
        return;
66,657✔
879
    }
66,657✔
880

66,660✔
881
    size_t header_size = header_width(uncompressed.size());
135,327✔
882
    compressed.resize(uncompressed.size() + header_size);
135,327✔
883
    size_t compressed_size = 0;
135,327✔
884
    // zlib is ineffective for very small sizes. Measured results indicate that
66,660✔
885
    // it only manages to compress at all past 100 bytes and the compression
66,660✔
886
    // ratio becomes interesting around 200 bytes.
66,660✔
887
    while (uncompressed.size() > 256) {
153,855✔
888
        init_arena(arena);
36,168✔
889
        const int compression_level = 1;
36,168✔
890
        auto ec = compress_lzfse_or_zlib(uncompressed, compressed, compressed_size, compression_level, &arena);
36,168✔
891
        if (ec == error::compress_buffer_too_small) {
36,168✔
892
            // Compressed result was larger than uncompressed, so just store the
12✔
893
            // uncompressed
12✔
894
            compressed_size = 0;
24✔
895
            break;
24✔
896
        }
24✔
897
        if (ec == compression::error::out_of_memory) {
36,144✔
898
            grow_arena(arena); // Throws
18,528✔
899
            continue;
18,528✔
900
        }
18,528✔
901
        if (ec) {
17,616✔
902
            throw std::system_error(ec);
×
903
        }
×
904
        REALM_ASSERT(compressed_size);
17,616✔
905
        compressed_size += header_size;
17,616✔
906
        record_compression_result(uncompressed.size(), compressed_size);
17,616✔
907
        compressed.resize(compressed_size);
17,616✔
908
        return;
17,616✔
909
    }
17,616✔
910

66,660✔
911
    // If compression made it grow or it was too small to compress then copy
66,660✔
912
    // the source over uncompressed
66,660✔
913
    if (!compressed_size) {
126,003✔
914
        record_compression_result(uncompressed.size(), uncompressed.size() + header_size);
117,726✔
915
        write_header({Algorithm::None, uncompressed.size()}, compressed);
117,726✔
916
        std::memcpy(compressed.data() + header_size, uncompressed.data(), uncompressed.size());
117,726✔
917
    }
117,726✔
918
}
117,711✔
919

920
util::AppendBuffer<char> compression::allocate_and_compress_nonportable(Span<const char> uncompressed_buf)
921
{
187,926✔
922
    util::compression::CompressMemoryArena arena;
187,926✔
923
    util::AppendBuffer<char> compressed;
187,926✔
924
    allocate_and_compress_nonportable(arena, uncompressed_buf, compressed);
187,926✔
925
    return compressed;
187,926✔
926
}
187,926✔
927

928
std::unique_ptr<InputStream> compression::decompress_nonportable_input_stream(InputStream& source, size_t& total_size)
929
{
94,494✔
930
    auto first_block = source.next_block();
94,494✔
931
    auto header = read_header(source, first_block);
94,494✔
932
    if (header.size == std::numeric_limits<size_t>::max())
94,494✔
933
        return nullptr;
6✔
934
    total_size = header.size;
94,488✔
935

50,349✔
936
    if (header.algorithm == Algorithm::None)
94,488✔
937
        return std::make_unique<DecompressInputStreamNone>(source, first_block);
92,010✔
938
#if REALM_USE_LIBCOMPRESSION
1,227✔
939
    if (__builtin_available(macOS 10.11, *)) {
1,239✔
940
        if (header.algorithm == Algorithm::Deflate || header.algorithm == Algorithm::Lzfse)
1,239✔
941
            return std::make_unique<DecompressInputStreamLibCompression>(source, first_block, header);
1,236✔
942
    }
2,147,483,647✔
943
#endif
2,147,483,647✔
944
    if (header.algorithm == Algorithm::Deflate)
2,147,484,898✔
945
        return std::make_unique<DecompressInputStreamZlib>(source, first_block, total_size);
1,239✔
946
    return nullptr;
2,147,483,659✔
947
}
2,147,483,659✔
948

949
size_t compression::get_uncompressed_size_from_header(InputStream& source)
950
{
172,686✔
951
    auto first_block = source.next_block();
172,686✔
952
    return read_header(source, first_block).size;
172,686✔
953
}
172,686✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc