• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1856

22 Nov 2023 04:32PM UTC coverage: 91.666% (-0.02%) from 91.689%
1856

push

Evergreen

web-flow
Fix duplication of recoverable list changes after unrecoverable changes (#7155)

After we copy a list in client reset recovery we need to not apply any changes
to that list in subsequent transactions as we copy directly to the final state
of the list and applying changes would just duplicate those changes.

This is only applicable to flexible sync when there's a subscription change in
between the write with unrecoverable changes and the write with recoverable
changes.

92274 of 169124 branches covered (0.0%)

77 of 77 new or added lines in 3 files covered. (100.0%)

120 existing lines in 16 files now uncovered.

231289 of 252317 relevant lines covered (91.67%)

6368856.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.56
/src/realm/util/compression.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/util/compression.hpp>
20
#include <realm/util/safe_int_ops.hpp>
21
#include <realm/util/scope_exit.hpp>
22

23
#include <cstring>
24
#include <limits>
25
#include <map>
26
#include <zlib.h>
27
#include <zconf.h> // for zlib
28

29
#if REALM_USE_LIBCOMPRESSION
30
#include <compression.h>
31
#include <os/availability.h>
32
#endif
33

34
using namespace realm;
35
using namespace util;
36

37
namespace {
38

39
enum class Algorithm {
40
    None = 0,
41
    Deflate = 1,
42
    Lzfse = 2,
43
};
44

45
using stream_avail_size_t = std::conditional_t<sizeof(uInt) < sizeof(size_t), uInt, size_t>;
46
constexpr stream_avail_size_t g_max_stream_avail = std::numeric_limits<stream_avail_size_t>::max();
47

48
stream_avail_size_t bounded_avail(size_t s)
49
{
98,679✔
50
    return s > g_max_stream_avail ? g_max_stream_avail : stream_avail_size_t(s);
98,679✔
51
}
98,679✔
52

53
Bytef* to_bytef(const char* str)
54
{
160,566✔
55
    return reinterpret_cast<Bytef*>(const_cast<char*>(str));
160,566✔
56
}
160,566✔
57

58
class ErrorCategoryImpl : public std::error_category {
59
public:
60
    const char* name() const noexcept override final
61
    {
×
62
        return "realm::util::compression::error";
×
63
    }
×
64
    std::string message(int err) const override final
65
    {
×
66
        using error = realm::util::compression::error;
×
67
        error e = error(err);
×
68
        switch (e) {
×
69
            case error::out_of_memory:
×
70
                return "Out of memory";
×
71
            case error::compress_buffer_too_small:
×
72
                return "Compression buffer too small";
×
73
            case error::compress_error:
×
74
                return "Compression error";
×
75
            case error::compress_input_too_long:
×
76
                return "Compression input too long";
×
77
            case error::corrupt_input:
×
78
                return "Corrupt input data";
×
79
            case error::incorrect_decompressed_size:
×
80
                return "Decompressed data size not equal to expected size";
×
81
            case error::decompress_error:
×
82
                return "Decompression error";
×
83
            case error::decompress_unsupported:
×
84
                return "Decompression failed due to unsupported input compression";
×
85
        }
×
86
        REALM_UNREACHABLE();
87
    }
×
88
};
89

90
ErrorCategoryImpl g_error_category;
91

92
void* custom_alloc(void* opaque, unsigned int cnt, unsigned int size)
93
{
114,615✔
94
    using Alloc = realm::util::compression::Alloc;
114,615✔
95
    Alloc& alloc = *static_cast<Alloc*>(opaque);
114,615✔
96
    std::size_t accum_size = cnt * std::size_t(size);
114,615✔
97
    return alloc.alloc(accum_size);
114,615✔
98
}
114,615✔
99

100
void custom_free(void* opaque, void* addr)
101
{
114,615✔
102
    using Alloc = realm::util::compression::Alloc;
114,615✔
103
    Alloc& alloc = *static_cast<Alloc*>(opaque);
114,615✔
104
    return alloc.free(addr);
114,615✔
105
}
114,615✔
106

107
void init_arena(compression::CompressMemoryArena& compress_memory_arena)
108
{
48,495✔
109
    if (compress_memory_arena.size() == 0) {
48,495✔
110
        // Zlib documentation says that with default settings deflate requires
7,614✔
111
        // at most 268 KB. We round up slightly.
7,614✔
112
        compress_memory_arena.resize(270 * 1024); // Throws
16,335✔
113
    }
16,335✔
114
    else {
32,160✔
115
        compress_memory_arena.reset();
32,160✔
116
    }
32,160✔
117
}
48,495✔
118

119
void grow_arena(compression::CompressMemoryArena& compress_memory_arena)
120
{
17,040✔
121
    std::size_t n = compress_memory_arena.size();
17,040✔
122
    REALM_ASSERT(n != 0);
17,040✔
123
    REALM_ASSERT(n != std::numeric_limits<std::size_t>::max());
17,040✔
124
    if (util::int_multiply_with_overflow_detect(n, 2))
17,040✔
125
        n = std::numeric_limits<std::size_t>::max();
×
126
    compress_memory_arena.resize(n); // Throws
17,040✔
127
}
17,040✔
128

129
uint8_t read_byte(InputStream& is, Span<const char>& buf)
130
{
669,192✔
131
    if (!buf.size())
669,192✔
132
        buf = is.next_block();
31,266✔
133
    if (buf.size()) {
669,192✔
134
        char c = buf.front();
637,974✔
135
        buf = buf.sub_span(1);
637,974✔
136
        return c;
637,974✔
137
    }
637,974✔
138
    return 0;
31,218✔
139
}
31,218✔
140

141
struct Header {
142
    Algorithm algorithm;
143
    size_t size;
144
};
145

146
Header read_header(InputStream& is, Span<const char>& buf)
147
{
327,735✔
148
    Header ret = {};
327,735✔
149
    auto first_byte = read_byte(is, buf);
327,735✔
150
    ret.algorithm = Algorithm(first_byte >> 4);
327,735✔
151
    size_t size_width = first_byte & 0b1111;
327,735✔
152
    if (size_width > sizeof(size_t))
327,735✔
153
        ret.size = -1;
12✔
154
    else {
327,723✔
155
        for (size_t i = 0; i < size_width; ++i) {
655,206✔
156
            ret.size += size_t(read_byte(is, buf)) << (i * 8);
327,483✔
157
        }
327,483✔
158
    }
327,723✔
159
    return ret;
327,735✔
160
}
327,735✔
161

162
uint8_t header_width(size_t size)
163
{
131,541✔
164
    uint8_t width = 0;
131,541✔
165
    while (size) {
287,565✔
166
        ++width;
156,024✔
167
        size >>= 8;
156,024✔
168
    }
156,024✔
169
    return width + 1;
131,541✔
170
}
131,541✔
171

172
size_t write_header(Header h, Span<char> target)
173
{
141,117✔
174
    uint8_t width = 0;
141,117✔
175
    target[0] = uint8_t(h.algorithm) << 4;
141,117✔
176
    for (size_t sz = h.size; sz; sz >>= 8, ++width) {
316,617✔
177
        target[width + 1] = uint8_t(sz & 0xFF);
175,500✔
178
        ++target[0];
175,500✔
179
    }
175,500✔
180
    return width + 1;
141,117✔
181
}
141,117✔
182

183
// Feed in a zlib header to inflate() for the places we don't store it
184
void inflate_zlib_header(z_stream& strm)
185
{
4,941✔
186
    Bytef out;
4,941✔
187
    strm.avail_in = 2;
4,941✔
188
    strm.next_in = to_bytef("\x78\x5e");
4,941✔
189
    strm.avail_out = sizeof(out);
4,941✔
190
    strm.next_out = &out;
4,941✔
191

4,941✔
192
    int rc = inflate(&strm, Z_SYNC_FLUSH);
4,941✔
193
    REALM_ASSERT(rc == Z_OK);
4,941!
194
    REALM_ASSERT(strm.avail_in == 0);
4,941!
195
}
4,941✔
196

197
struct DecompressInputStreamNone final : public InputStream {
198
    DecompressInputStreamNone(InputStream& s, Span<const char> b)
199
        : source(s)
200
        , current_block(b)
201
    {
91,761✔
202
        if (current_block.empty())
91,761✔
203
            current_block = source.next_block();
687✔
204
    }
91,761✔
205
    InputStream& source;
206
    Span<const char> current_block;
207

208
    Span<const char> next_block() override
209
    {
324,360✔
210
        auto ret = current_block;
324,360✔
211
        if (ret.size())
324,360✔
212
            current_block = source.next_block();
232,611✔
213
        return ret;
324,360✔
214
    }
324,360✔
215
};
216

217
class DecompressInputStreamZlib final : public InputStream {
218
public:
219
    DecompressInputStreamZlib(InputStream& s, Span<const char> b, size_t total_size)
220
        : m_source(s)
221
    {
1,212✔
222
        // Arbitrary upper limit to reduce peak memory usage
1,212✔
223
        constexpr const size_t max_out_buffer_size = 1024 * 1024;
1,212✔
224
        m_buffer.reserve(std::min(total_size, max_out_buffer_size));
1,212✔
225

1,212✔
226
        int rc = inflateInit(&m_strm);
1,212✔
227
        if (rc != Z_OK)
1,212!
228
            throw std::system_error(make_error_code(compression::error::decompress_error), m_strm.msg);
1,212✔
229
        inflate_zlib_header(m_strm);
1,212✔
230

1,212✔
231
        m_strm.avail_in = bounded_avail(b.size());
1,212✔
232
        m_strm.next_in = to_bytef(b.data());
1,212✔
233
        m_current_block = b.sub_span(m_strm.avail_in);
1,212✔
234
    }
1,212✔
235

236
    ~DecompressInputStreamZlib()
237
    {
1,212✔
238
        inflateEnd(&m_strm);
1,212✔
239
    }
1,212✔
240

241
    Span<const char> next_block() override
242
    {
4,164✔
243
        m_buffer.resize(m_buffer.capacity());
4,164✔
244
        m_strm.avail_out = bounded_avail(m_buffer.size());
4,164✔
245
        m_strm.next_out = to_bytef(m_buffer.data());
4,164✔
246

4,164✔
247
        while (true) {
4,560✔
248
            // We may have some leftover input buffer from a previous call if the
4,560✔
249
            // inflated result didn't fit in the output buffer. If not, we need to
4,560✔
250
            // fetch the next block.
4,560✔
251
            if (m_strm.avail_in == 0) {
4,560!
252
                m_current_block = m_source.next_block();
2,298✔
253
                if (m_current_block.size()) {
2,298!
254
                    m_strm.next_in = to_bytef(m_current_block.data());
1,098✔
255
                    m_strm.avail_in = bounded_avail(m_current_block.size());
1,098✔
256
                }
1,098✔
257
            }
2,298✔
258

4,560✔
259
            m_strm.total_out = 0;
4,560✔
260
            auto rc = inflate(&m_strm, m_strm.avail_in ? Z_SYNC_FLUSH : Z_FINISH);
4,560!
261
            REALM_ASSERT(rc == Z_OK || rc == Z_STREAM_END || rc == Z_BUF_ERROR);
4,560!
262

4,560✔
263
            if (m_strm.total_out) {
4,560!
264
                // We got some output, so return that. We might also have reached
2,952✔
265
                // the end of the stream, which'll be reported on the next call
2,952✔
266
                // if so.
2,952✔
267
                REALM_ASSERT(m_strm.total_out <= m_buffer.capacity());
2,952!
268
                m_buffer.resize(m_strm.total_out);
2,952✔
269
                return m_buffer;
2,952✔
270
            }
2,952✔
271

1,608✔
272
            if (rc != Z_OK) {
1,608!
273
                // We reached the end of the stream without producing more data, so
1,212✔
274
                // we're done.
1,212✔
275
                return {nullptr, nullptr};
1,212✔
276
            }
1,212✔
277

1,608✔
278
            // Otherwise we produced no output but also didn't reach the end of the
1,608✔
279
            // stream, so we need to feed more data in.
1,608✔
280
        }
1,608✔
281
    }
4,164✔
282

283
private:
284
    InputStream& m_source;
285
    Span<const char> m_current_block;
286
    z_stream m_strm = {};
287
    AppendBuffer<char> m_buffer;
288
};
289

290
#if REALM_USE_LIBCOMPRESSION
291

292
compression_algorithm algorithm_to_compression_algorithm(Algorithm a)
293
{
12,663✔
294
    switch (Algorithm(a)) {
12,663✔
295
        case Algorithm::Deflate:
7,005✔
296
            return COMPRESSION_ZLIB;
7,005✔
297
        case Algorithm::Lzfse:
5,655✔
298
            return COMPRESSION_LZFSE;
5,655✔
299
        default:
3✔
300
            return (compression_algorithm)0;
3✔
301
    }
12,663✔
302
}
12,663✔
303

304
API_AVAILABLE_BEGIN(macos(10.11))
305

306
class DecompressInputStreamLibCompression final : public InputStream {
307
public:
308
    DecompressInputStreamLibCompression(InputStream& s, Span<const char> b, Header h)
309
        : m_source(s)
310
    {
1,212✔
311
        // Arbitrary upper limit to reduce peak memory usage
312
        constexpr const size_t max_out_buffer_size = 1024 * 1024;
1,212✔
313
        m_buffer.reserve(std::min(h.size, max_out_buffer_size));
1,212✔
314
        auto rc = compression_stream_init(&m_strm, COMPRESSION_STREAM_DECODE,
1,212✔
315
                                          algorithm_to_compression_algorithm(h.algorithm));
1,212✔
316
        if (rc != COMPRESSION_STATUS_OK)
1,212✔
317
            throw std::system_error(compression::error::decompress_error);
318
        m_strm.src_size = b.size();
1,212✔
319
        m_strm.src_ptr = to_bytef(b.data());
1,212✔
320
    }
1,212✔
321

322
    ~DecompressInputStreamLibCompression()
323
    {
1,212✔
324
        compression_stream_destroy(&m_strm);
1,212✔
325
    }
1,212✔
326

327
    Span<const char> next_block() override
328
    {
3,483✔
329
        m_buffer.resize(m_buffer.capacity());
3,483✔
330
        m_strm.dst_size = m_buffer.size();
3,483✔
331
        m_strm.dst_ptr = to_bytef(m_buffer.data());
3,483✔
332

333
        while (true) {
4,968✔
334
            // We may have some leftover input buffer from a previous call if the
335
            // inflated result didn't fit in the output buffer. If not, we need to
336
            // fetch the next block.
337
            bool end = false;
4,965✔
338
            if (m_strm.src_size == 0) {
4,965✔
339
                if (auto block = m_source.next_block(); block.size()) {
3,657✔
340
                    m_strm.src_ptr = to_bytef(block.data());
1,509✔
341
                    m_strm.src_size = block.size();
1,509✔
342
                }
1,509✔
343
                else {
2,148✔
344
                    end = true;
2,148✔
345
                }
2,148✔
346
            }
3,657✔
347

348
            auto rc = compression_stream_process(&m_strm, end ? COMPRESSION_STREAM_FINALIZE : 0);
2,817✔
349
            if (rc == COMPRESSION_STATUS_ERROR)
4,965✔
350
                throw std::system_error(compression::error::corrupt_input);
351
            auto bytes_written = m_buffer.size() - m_strm.dst_size;
4,965✔
352
            if (bytes_written) {
4,965✔
353
                // We got some output, so return that. We might also have reached
354
                // the end of the stream, which'll be reported on the next call
355
                // if so.
356
                m_buffer.resize(bytes_written);
2,268✔
357
                return m_buffer;
2,268✔
358
            }
2,268✔
359
            if (rc == COMPRESSION_STATUS_END) {
2,697✔
360
                // We reached the end of the stream and are done
361
                return {nullptr, nullptr};
1,212✔
362
            }
1,212✔
363

364
            if (end) {
1,485✔
365
                // We ran out of input data but didn't get COMPRESSION_STATUS_END,
366
                // so the input is truncated
367
                throw std::system_error(compression::error::corrupt_input);
368
            }
369

370
            // Otherwise we produced no output but also didn't reach the end of the
371
            // stream, so we need to feed more data in.
372
        }
1,485✔
373
    }
3,483✔
374

375
private:
376
    InputStream& m_source;
377
    compression_stream m_strm = {};
378
    AppendBuffer<char> m_buffer;
379
};
380

381
API_AVAILABLE_END
382
#endif
383

384
std::error_code decompress_none(InputStream& compressed, Span<const char> compressed_buf, Span<char> decompressed_buf)
385
{
56,382✔
386
    do {
56,382✔
387
        auto count = std::min(decompressed_buf.size(), compressed_buf.size());
56,382✔
388
        std::memcpy(decompressed_buf.data(), compressed_buf.data(), count);
56,382✔
389
        decompressed_buf = decompressed_buf.sub_span(count);
56,382✔
390
        compressed_buf = compressed.next_block();
56,382✔
391
    } while (compressed_buf.size() && decompressed_buf.size());
56,382!
392

27,666✔
393
    if (compressed_buf.size() || decompressed_buf.size()) {
56,382✔
394
        return compression::error::incorrect_decompressed_size;
×
395
    }
×
396
    return std::error_code{};
56,382✔
397
}
56,382✔
398

399
std::error_code decompress_zlib(InputStream& compressed, Span<const char> compressed_buf, Span<char> decompressed_buf,
400
                                bool has_header)
401
{
10,266✔
402
    using namespace compression;
10,266✔
403

10,266✔
404
    z_stream strm = {};
10,266✔
405
    int rc = inflateInit(&strm);
10,266✔
406
    if (rc != Z_OK)
10,266!
407
        return error::decompress_error;
10,266✔
408
    util::ScopeExit cleanup([&]() noexcept {
10,266✔
409
        // inflateEnd() only fails if we modified the fields of strm in an invalid way
10,266✔
410
        int rc = inflateEnd(&strm);
10,266✔
411
        REALM_ASSERT(rc == Z_OK);
10,266!
412
        static_cast<void>(rc);
10,266✔
413
    });
10,266✔
414

10,266✔
415
    if (!has_header)
10,266!
416
        inflate_zlib_header(strm);
3,729✔
417

10,266✔
418
    do {
11,040✔
419
        size_t in_offset = 0;
11,040✔
420

11,040✔
421
        // This loop will typically run exactly once. If size_t is larger than
11,040✔
422
        // uInt (as it is on most 64-bit platforms), input or output larger than
11,040✔
423
        // uInt's upper bound will require multiple iterations of passing data
11,040✔
424
        // to zlib.
11,040✔
425
        while (in_offset < compressed_buf.size()) {
11,820!
426
            strm.avail_in = bounded_avail(compressed_buf.size() - in_offset);
11,043✔
427
            strm.next_in = to_bytef(compressed_buf.data() + in_offset);
11,043✔
428
            strm.next_out = to_bytef(decompressed_buf.data());
11,043✔
429
            strm.avail_out = bounded_avail(decompressed_buf.size());
11,043✔
430
            strm.total_in = 0;
11,043✔
431
            strm.total_out = 0;
11,043✔
432

11,043✔
433
            int rc = inflate(&strm, Z_SYNC_FLUSH);
11,043✔
434
            REALM_ASSERT(rc != Z_STREAM_ERROR && rc != Z_MEM_ERROR);
11,043!
435
            in_offset += strm.total_in;
11,043✔
436
            decompressed_buf = decompressed_buf.sub_span(strm.total_out);
11,043✔
437

11,043✔
438
            if (rc == Z_OK) {
11,043!
439
                // We made forward progress but did not reach the end
780✔
440
                continue;
780✔
441
            }
780✔
442
            if (rc == Z_STREAM_END) {
10,263!
443
                // If we got Z_STREAM_END and there's leftover input then the
10,257✔
444
                // data is invalid
10,257✔
445
                if (strm.avail_in || in_offset < compressed_buf.size() || compressed.next_block().size())
10,257!
446
                    return error::corrupt_input;
3✔
447
                if (decompressed_buf.size() != 0)
10,254!
448
                    return error::incorrect_decompressed_size;
3✔
449
                return std::error_code{};
10,251✔
450
            }
10,251✔
451
            if (rc == Z_NEED_DICT) {
6!
452
                // We don't support custom dictionaries
453
                return error::decompress_unsupported;
×
454
            }
×
455
            if (rc == Z_DATA_ERROR) {
6!
456
                return error::corrupt_input;
3✔
457
            }
3✔
458
            if (rc == Z_BUF_ERROR) {
3!
459
                if (strm.avail_out == 0) {
3!
460
                    if (decompressed_buf.size() > 0) {
3!
461
                        // We need to pass in the next range of the decompress buffer
462
                        continue;
×
463
                    }
×
464
                    // We should never run out of output buffer space unless the
3✔
465
                    // decompressed size was wrong.
3✔
466
                    return error::incorrect_decompressed_size;
3✔
467
                }
3✔
468
                // If there's space left in the output buffer then that means
469
                // we ran out of input without getting Z_STREAM_END
470
                return error::corrupt_input;
×
471
            }
×
472

473
            // Unknown error code
474
            REALM_UNREACHABLE();
UNCOV
475
        }
×
476
    } while ((compressed_buf = compressed.next_block()), compressed_buf.size());
11,040!
477

10,266✔
478
    if (strm.avail_in && !strm.avail_out) {
10,266!
479
        // Ran out of output buffer with remaining input
480
        return error::incorrect_decompressed_size;
×
481
    }
×
482

3✔
483
    // We ran out of input without getting Z_STREAM_END
3✔
484
    return error::corrupt_input;
3✔
485
}
3✔
486

487
#if REALM_USE_LIBCOMPRESSION
488
API_AVAILABLE_BEGIN(macos(10.11))
489
std::error_code decompress_libcompression(InputStream& compressed, Span<const char> compressed_buf,
490
                                          Span<char> decompressed_buf, Algorithm algorithm, bool has_header)
491
{
11,451✔
492
    using namespace compression;
11,451✔
493

494
    // If we're given a buffer with a zlib header we have to parse it outselves,
495
    // as libcompression doesn't handle it.
496
    if (has_header) {
11,451✔
497
        // The first nibble is compression algorithm (where 8 is DEFLATE), and second
498
        // nibble is window size. RFC 1950 only allows window size 7, so the first
499
        // byte must be 0x78.
500
        if (read_byte(compressed, compressed_buf) != 0x78)
7,005✔
501
            return error::corrupt_input;
502
        // The second byte has flags. Bit 5 is the only interesting one, which
503
        // indicates if a custom dictionary was used. We don't support that.
504
        uint8_t flags = read_byte(compressed, compressed_buf);
7,005✔
505
        if (flags & 0b100000)
7,005✔
506
            return error::decompress_unsupported;
507
        algorithm = Algorithm::Deflate;
7,005✔
508
    }
7,005✔
509

510
    auto compression_algorithm = algorithm_to_compression_algorithm(algorithm);
11,451✔
511
    if (!compression_algorithm)
11,451✔
512
        return error::decompress_unsupported;
3✔
513

514
    compression_stream strm;
11,448✔
515
    auto rc = compression_stream_init(&strm, COMPRESSION_STREAM_DECODE, compression_algorithm);
11,448✔
516
    if (rc != COMPRESSION_STATUS_OK)
11,448✔
517
        return error::decompress_error;
518

519
    // Using ScopeExit here warns about missing availability checking, but also
520
    // complains about redundant availability checking if it's added.
521
    struct Cleanup {
11,448✔
522
        compression_stream* strm;
11,448✔
523
        ~Cleanup()
11,448✔
524
        {
11,448✔
525
            compression_stream_destroy(strm);
11,448✔
526
        }
11,448✔
527
    } cleanup{&strm};
11,448✔
528

529
    strm.dst_size = decompressed_buf.size();
11,448✔
530
    strm.dst_ptr = to_bytef(decompressed_buf.data());
11,448✔
531

532
    uint32_t expected_checksum = 0;
11,448✔
533
    uLong actual_checksum = 1;
11,448✔
534
    do {
12,219✔
535
        strm.src_size = compressed_buf.size();
12,219✔
536
        strm.src_ptr = to_bytef(compressed_buf.data());
12,219✔
537

538
        // compression_stream_process() only writes 64 KB at a time, and you
539
        // have to call it in a loop until it stops giving more output before
540
        // feeding in more input
541
        while (rc != COMPRESSION_STATUS_END) {
36,300✔
542
            auto dst_ptr_start = strm.dst_ptr;
24,828✔
543
            rc = compression_stream_process(&strm, 0);
24,828✔
544
            if (rc == COMPRESSION_STATUS_ERROR)
24,828✔
545
                return error::corrupt_input;
546
            if (strm.dst_ptr == dst_ptr_start)
24,828✔
547
                break;
747✔
548

549
            // libcompression doesn't check the checksum, so do it manually.
550
            // This loop will never actually run multiple times as in practice
551
            // libcompression doesn't actually write more bytes than fit in uLong
552
            // in a single call to compression_stream_process()
553
            while (dst_ptr_start < strm.dst_ptr) {
48,162✔
554
                auto size = bounded_avail(strm.dst_ptr - dst_ptr_start);
24,081✔
555
                actual_checksum = adler32(actual_checksum, dst_ptr_start, size);
24,081✔
556
                dst_ptr_start += size;
24,081✔
557
            }
24,081✔
558
        }
24,081✔
559

560
        // The checksum at the end can potentially be straddling a block boundary
561
        // and we can't rewind, so maintain a rolling window of the last four
562
        // bytes seen.
563
        for (uint8_t byte : compressed_buf.last(std::min<size_t>(4u, compressed_buf.size()))) {
47,391✔
564
            expected_checksum <<= 8;
47,391✔
565
            expected_checksum += byte;
47,391✔
566
        }
47,391✔
567
    } while ((compressed_buf = compressed.next_block()), compressed_buf.size());
12,219✔
568
    rc = compression_stream_process(&strm, COMPRESSION_STREAM_FINALIZE);
11,448✔
569
    if (rc != COMPRESSION_STATUS_END)
11,448✔
570
        return error::corrupt_input;
6✔
571
    if (strm.dst_size != 0)
11,442✔
572
        return error::incorrect_decompressed_size;
3✔
573
    if (expected_checksum != actual_checksum)
11,439✔
574
        return error::corrupt_input;
6✔
575
    // Check for remaining extra input
576
    if (strm.src_size || compressed.next_block().size())
11,433✔
577
        return error::corrupt_input;
578
    return std::error_code{};
11,433✔
579
}
11,433✔
580
API_AVAILABLE_END
581
#endif
582

583
std::error_code decompress(InputStream& compressed, Span<const char> compressed_buf, Span<char> decompressed_buf,
584
                           Algorithm algorithm, bool has_header)
585
{
78,114✔
586
    using namespace compression;
78,114✔
587

37,941✔
588
    if (decompressed_buf.size() == 0) {
78,114✔
589
        return std::error_code{};
12✔
590
    }
12✔
591
    if (!compressed_buf.size()) {
78,102✔
592
        return error::incorrect_decompressed_size;
×
593
    }
×
594

37,935✔
595
#if REALM_USE_LIBCOMPRESSION
40,167✔
596
    // All of our non-macOS deployment targets are high enough to have libcompression,
597
    // but we support some older macOS versions
598
    if (__builtin_available(macOS 10.11, *)) {
40,167✔
599
        if (algorithm != Algorithm::None)
40,167✔
600
            return decompress_libcompression(compressed, compressed_buf, decompressed_buf, algorithm, has_header);
11,451✔
601
    }
28,716✔
602
#endif
28,716✔
603

37,935✔
604
    switch (algorithm) {
66,651✔
605
        case Algorithm::None:
56,382✔
606
            return decompress_none(compressed, compressed_buf, decompressed_buf);
56,382✔
607
        case Algorithm::Deflate:
10,266✔
608
            return decompress_zlib(compressed, compressed_buf, decompressed_buf, has_header);
10,266✔
609
        default:
3✔
610
            return error::decompress_unsupported;
3✔
611
    }
66,651✔
612
}
66,651✔
613

614
#if 0
615
struct CompressionStats {
616
    std::mutex mutex;
617
    std::map<size_t, std::pair<size_t, size_t>> stats;
618
    ~CompressionStats()
619
    {
620
        std::lock_guard lock(mutex);
621
        size_t total_uncompressed = 0;
622
        size_t total_compressed = 0;
623
        for (auto& [size, results] : stats) {
624
            fprintf(stderr, "%zu: %zu %g\n", size, results.first, static_cast<double>(results.second) / results.first / size * 100);
625
            total_uncompressed += size * results.first;
626
            total_compressed += results.second;
627
        }
628
        fprintf(stderr, "total: %zu -> %zu (%g%%)\n", total_uncompressed, total_compressed, (double)total_compressed / total_uncompressed * 100.0);
629
    }
630
} s_compression_stats;
631

632
void record_compression_result(size_t uncompressed, size_t compressed)
633
{
634
    std::lock_guard lock(s_compression_stats.mutex);
635
    auto& arr = s_compression_stats.stats[uncompressed];
636
    arr.first++;
637
    arr.second += compressed;
638
}
639
#else
640
void record_compression_result(size_t, size_t) {}
124,065✔
641
#endif
642

643
#if REALM_USE_LIBCOMPRESSION
644
API_AVAILABLE_BEGIN(macos(10.11))
645
std::error_code compress_lzfse(Span<const char> uncompressed_buf, Span<char> compressed_buf,
646
                               std::size_t& compressed_size, compression::Alloc* custom_allocator)
647
{
25,572✔
648
    using namespace compression;
25,572✔
649
    if (compressed_buf.size() < 4)
25,572✔
650
        return error::compress_buffer_too_small;
651
    // compression_encode_buffer() takes a size_t, but crashes if the value is
652
    // larger than 2^31. Using the stream API works, but it's slower for
653
    // normal-sized input, and we can just fall back to zlib for this edge case.
654
    if (uncompressed_buf.size() > std::numeric_limits<int32_t>::max())
25,572✔
655
        return error::compress_input_too_long;
656

657
    auto uncompressed_ptr = to_bytef(uncompressed_buf.data());
25,572✔
658
    auto uncompressed_size = uncompressed_buf.size();
25,572✔
659
    auto compressed_ptr = to_bytef(compressed_buf.data());
25,572✔
660
    auto compressed_buf_size = compressed_buf.size() - 4;
25,572✔
661

662
    void* scratch_buffer = nullptr;
25,572✔
663
    if (custom_allocator) {
25,572✔
664
        scratch_buffer = custom_allocator->alloc(compression_encode_scratch_buffer_size(COMPRESSION_LZFSE));
25,572✔
665
        if (!scratch_buffer)
25,572✔
666
            return error::out_of_memory;
17,040✔
667
    }
8,532✔
668

669
    size_t bytes = compression_encode_buffer(compressed_ptr, compressed_buf_size, uncompressed_ptr, uncompressed_size,
8,532✔
670
                                             scratch_buffer, COMPRESSION_LZFSE);
8,532✔
671
    if (bytes == 0)
8,532✔
672
        return error::compress_buffer_too_small;
12✔
673

674
    // Calculate the checksum and append it to the end of the stream
675
    uLong checksum = htonl(adler32(1, uncompressed_ptr, static_cast<uInt>(uncompressed_size)));
8,520✔
676
    for (int i = 0; i < 4; ++i) {
42,600✔
677
        compressed_buf[bytes + i] = checksum & 0xFF;
34,080✔
678
        checksum >>= 8;
34,080✔
679
    }
34,080✔
680
    compressed_size = bytes + 4;
8,520✔
681
    return std::error_code{};
8,520✔
682
}
8,520✔
683
API_AVAILABLE_END
684
#endif
685

686
std::error_code compress_lzfse_or_zlib(Span<const char> uncompressed_buf, Span<char> compressed_buf,
687
                                       std::size_t& compressed_size, int compression_level,
688
                                       compression::Alloc* custom_allocator)
689
{
33,057✔
690
    using namespace compression;
33,057✔
691
#if REALM_USE_LIBCOMPRESSION
25,572✔
692
    if (__builtin_available(macOS 10.11, *)) {
25,572✔
693
        size_t len = write_header({Algorithm::Lzfse, uncompressed_buf.size()}, compressed_buf);
25,572✔
694
        auto ec = compress_lzfse(uncompressed_buf, compressed_buf.sub_span(len), compressed_size, custom_allocator);
25,572✔
695
        if (ec != error::compress_input_too_long)
25,572✔
696
            return ec;
25,572✔
697
    }
698
#endif
699
    size_t len = header_width(uncompressed_buf.size());
7,485✔
700
    REALM_ASSERT(len >= 2);
7,485!
701
    auto ec = compress(uncompressed_buf, compressed_buf.sub_span(len - 2), compressed_size, compression_level,
7,485✔
702
                       custom_allocator);
7,485✔
703
    if (!ec) {
7,485!
704
        // Note: overwrites zlib header
7,473✔
705
        write_header({Algorithm::Deflate, uncompressed_buf.size()}, compressed_buf);
7,473✔
706
        compressed_size -= 2;
7,473✔
707
    }
7,473✔
708
    return ec;
7,485✔
709
}
7,485✔
710
} // unnamed namespace
711

712

713
const std::error_category& compression::error_category() noexcept
714
{
×
715
    return g_error_category;
×
716
}
×
717

718
std::error_code compression::make_error_code(error error_code) noexcept
719
{
178,455✔
720
    return std::error_code(int(error_code), g_error_category);
178,455✔
721
}
178,455✔
722

723

724
// zlib compression level: 1-9, 1 fastest.
725

726
// zlib deflateBound()
727
std::size_t compression::compress_bound(std::size_t size) noexcept
728
{
96✔
729
    // DEFLATE's worst-case size is a 6 byte zlib header, plus the uncompressed
48✔
730
    // data, plus a 5 byte header for every 16383 byte block.
48✔
731
    size_t overhead = 6 + 5 * (size / 16383 + 1);
96✔
732
    if (std::numeric_limits<size_t>::max() - overhead < size)
96✔
733
        return 0;
×
734
    return size + overhead;
96✔
735
}
96✔
736

737

738
// zlib deflate()
739
std::error_code compression::compress(Span<const char> uncompressed_buf, Span<char> compressed_buf,
740
                                      std::size_t& compressed_size, int compression_level, Alloc* custom_allocator)
741
{
23,025✔
742
    auto uncompressed_ptr = to_bytef(uncompressed_buf.data());
23,025✔
743
    auto uncompressed_size = uncompressed_buf.size();
23,025✔
744
    auto compressed_ptr = to_bytef(compressed_buf.data());
23,025✔
745
    auto compressed_buf_size = compressed_buf.size();
23,025✔
746

14,802✔
747
    z_stream strm = {};
23,025✔
748
    if (custom_allocator) {
23,025✔
749
        strm.opaque = custom_allocator;
22,923✔
750
        strm.zalloc = &custom_alloc;
22,923✔
751
        strm.zfree = &custom_free;
22,923✔
752
    }
22,923✔
753

14,802✔
754
    int rc = deflateInit(&strm, compression_level);
23,025✔
755
    if (rc == Z_MEM_ERROR)
23,025✔
756
        return error::out_of_memory;
14,802✔
757

14,802✔
758
    if (rc != Z_OK)
23,025✔
759
        return error::compress_error;
14,802✔
760

14,802✔
761
    strm.next_in = uncompressed_ptr;
23,025✔
762
    strm.avail_in = 0;
23,025✔
763
    strm.next_out = compressed_ptr;
23,025✔
764
    strm.avail_out = 0;
23,025✔
765

14,802✔
766
    std::size_t next_in_ndx = 0;
23,025✔
767
    std::size_t next_out_ndx = 0;
23,025✔
768
    REALM_ASSERT(rc == Z_OK);
23,025✔
769
    while (rc == Z_OK || rc == Z_BUF_ERROR) {
48,234✔
770
        REALM_ASSERT(strm.next_in + strm.avail_in == uncompressed_ptr + next_in_ndx);
27,393✔
771
        REALM_ASSERT(strm.next_out + strm.avail_out == compressed_ptr + next_out_ndx);
27,393✔
772

16,566✔
773
        bool stream_updated = false;
27,393✔
774

16,566✔
775
        if (strm.avail_in == 0 && next_in_ndx < uncompressed_size) {
27,393✔
776
            auto in_size = bounded_avail(uncompressed_size - next_in_ndx);
23,013✔
777
            next_in_ndx += in_size;
23,013✔
778
            strm.avail_in = uInt(in_size);
23,013✔
779
            stream_updated = true;
23,013✔
780
        }
23,013✔
781

16,566✔
782
        if (strm.avail_out == 0 && next_out_ndx < compressed_buf_size) {
27,393✔
783
            auto out_size = bounded_avail(compressed_buf_size - next_out_ndx);
23,025✔
784
            next_out_ndx += out_size;
23,025✔
785
            strm.avail_out = uInt(out_size);
23,025✔
786
            stream_updated = true;
23,025✔
787
        }
23,025✔
788

16,566✔
789
        if (rc == Z_BUF_ERROR && !stream_updated) {
27,393✔
790
            deflateEnd(&strm);
2,184✔
791
            return error::compress_buffer_too_small;
2,184✔
792
        }
2,184✔
793

15,684✔
794
        int flush = (next_in_ndx == uncompressed_size) ? Z_FINISH : Z_NO_FLUSH;
25,209✔
795

15,684✔
796
        rc = deflate(&strm, flush);
25,209✔
797
        REALM_ASSERT(rc != Z_STREAM_END || flush == Z_FINISH);
25,209✔
798
    }
25,209✔
799

14,802✔
800
    if (rc != Z_STREAM_END) {
21,723✔
801
        deflateEnd(&strm);
×
802
        return error::compress_error;
×
803
    }
×
804

13,920✔
805
    compressed_size = next_out_ndx - strm.avail_out;
20,841✔
806

13,920✔
807
    rc = deflateEnd(&strm);
20,841✔
808
    if (rc != Z_OK)
20,841✔
809
        return error::compress_error;
13,920✔
810

13,920✔
811
    return std::error_code{};
20,841✔
812
}
20,841✔
813

814
std::error_code compression::decompress(InputStream& compressed, Span<char> decompressed_buf)
815
{
120✔
816
    return ::decompress(compressed, compressed.next_block(), decompressed_buf, Algorithm::Deflate, true);
120✔
817
}
120✔
818

819
std::error_code compression::decompress(Span<const char> compressed_buf, Span<char> decompressed_buf)
820
{
13,434✔
821
    SimpleInputStream adapter(compressed_buf);
13,434✔
822
    return ::decompress(adapter, adapter.next_block(), decompressed_buf, Algorithm::Deflate, true);
13,434✔
823
}
13,434✔
824

825
std::error_code compression::decompress_nonportable(InputStream& compressed, AppendBuffer<char>& decompressed)
826
{
65,367✔
827
    auto compressed_buf = compressed.next_block();
65,367✔
828
    auto header = read_header(compressed, compressed_buf);
65,367✔
829
    if (header.size == std::numeric_limits<size_t>::max())
65,367✔
830
        return error::out_of_memory;
6✔
831
    decompressed.resize(header.size);
65,361✔
832
    if (header.size == 0)
65,361✔
833
        return std::error_code{};
801✔
834
    return ::decompress(compressed, compressed_buf, decompressed, header.algorithm, false);
64,560✔
835
}
64,560✔
836

837
std::error_code compression::allocate_and_compress(CompressMemoryArena& compress_memory_arena,
838
                                                   Span<const char> uncompressed_buf,
839
                                                   std::vector<char>& compressed_buf)
840
{
13,272✔
841
    const int compression_level = 1;
13,272✔
842
    std::size_t compressed_size = 0;
13,272✔
843

6,399✔
844
    if (compressed_buf.size() < 256)
13,272✔
845
        compressed_buf.resize(256); // Throws
12,618✔
846

6,399✔
847
    for (;;) {
15,438✔
848
        init_arena(compress_memory_arena);
15,438✔
849
        std::error_code ec = compression::compress(uncompressed_buf, compressed_buf, compressed_size,
15,438✔
850
                                                   compression_level, &compress_memory_arena);
15,438✔
851

7,266✔
852
        if (REALM_UNLIKELY(ec)) {
15,438✔
853
            if (ec == compression::error::compress_buffer_too_small) {
2,166✔
854
                std::size_t n = compressed_buf.size();
2,166✔
855
                REALM_ASSERT(n != std::numeric_limits<std::size_t>::max());
2,166✔
856
                if (util::int_multiply_with_overflow_detect(n, 2))
2,166✔
857
                    n = std::numeric_limits<std::size_t>::max();
×
858
                compressed_buf.resize(n); // Throws
2,166✔
859
                continue;
2,166✔
860
            }
2,166✔
861
            if (ec == compression::error::out_of_memory) {
×
862
                grow_arena(compress_memory_arena); // Throws
×
863
                continue;
×
864
            }
×
865
            return ec;
×
866
        }
×
867
        break;
13,272✔
868
    }
13,272✔
869
    compressed_buf.resize(compressed_size);
13,272✔
870
    return std::error_code{};
13,272✔
871
}
13,272✔
872

873
void compression::allocate_and_compress_nonportable(CompressMemoryArena& arena, Span<const char> uncompressed,
874
                                                    util::AppendBuffer<char>& compressed)
875
{
186,855✔
876
    if (uncompressed.size() == 0) {
186,855✔
877
        compressed.resize(0);
62,793✔
878
        return;
62,793✔
879
    }
62,793✔
880

61,029✔
881
    size_t header_size = header_width(uncompressed.size());
124,062✔
882
    compressed.resize(uncompressed.size() + header_size);
124,062✔
883
    size_t compressed_size = 0;
124,062✔
884
    // zlib is ineffective for very small sizes. Measured results indicate that
61,029✔
885
    // it only manages to compress at all past 100 bytes and the compression
61,029✔
886
    // ratio becomes interesting around 200 bytes.
61,029✔
887
    while (uncompressed.size() > 256) {
141,102✔
888
        init_arena(arena);
33,057✔
889
        const int compression_level = 1;
33,057✔
890
        auto ec = compress_lzfse_or_zlib(uncompressed, compressed, compressed_size, compression_level, &arena);
33,057✔
891
        if (ec == error::compress_buffer_too_small) {
33,057✔
892
            // Compressed result was larger than uncompressed, so just store the
12✔
893
            // uncompressed
12✔
894
            compressed_size = 0;
24✔
895
            break;
24✔
896
        }
24✔
897
        if (ec == compression::error::out_of_memory) {
33,033✔
898
            grow_arena(arena); // Throws
17,040✔
899
            continue;
17,040✔
900
        }
17,040✔
901
        if (ec) {
15,993✔
902
            throw std::system_error(ec);
×
903
        }
×
904
        REALM_ASSERT(compressed_size);
15,993✔
905
        compressed_size += header_size;
15,993✔
906
        record_compression_result(uncompressed.size(), compressed_size);
15,993✔
907
        compressed.resize(compressed_size);
15,993✔
908
        return;
15,993✔
909
    }
15,993✔
910

61,029✔
911
    // If compression made it grow or it was too small to compress then copy
61,029✔
912
    // the source over uncompressed
61,029✔
913
    if (!compressed_size) {
115,542✔
914
        record_compression_result(uncompressed.size(), uncompressed.size() + header_size);
108,072✔
915
        write_header({Algorithm::None, uncompressed.size()}, compressed);
108,072✔
916
        std::memcpy(compressed.data() + header_size, uncompressed.data(), uncompressed.size());
108,072✔
917
    }
108,072✔
918
}
108,069✔
919

920
util::AppendBuffer<char> compression::allocate_and_compress_nonportable(Span<const char> uncompressed_buf)
921
{
184,329✔
922
    util::compression::CompressMemoryArena arena;
184,329✔
923
    util::AppendBuffer<char> compressed;
184,329✔
924
    allocate_and_compress_nonportable(arena, uncompressed_buf, compressed);
184,329✔
925
    return compressed;
184,329✔
926
}
184,329✔
927

928
std::unique_ptr<InputStream> compression::decompress_nonportable_input_stream(InputStream& source, size_t& total_size)
929
{
94,197✔
930
    auto first_block = source.next_block();
94,197✔
931
    auto header = read_header(source, first_block);
94,197✔
932
    if (header.size == std::numeric_limits<size_t>::max())
94,197✔
933
        return nullptr;
6✔
934
    total_size = header.size;
94,191✔
935

49,389✔
936
    if (header.algorithm == Algorithm::None)
94,191✔
937
        return std::make_unique<DecompressInputStreamNone>(source, first_block);
91,761✔
938
#if REALM_USE_LIBCOMPRESSION
1,212✔
939
    if (__builtin_available(macOS 10.11, *)) {
1,215✔
940
        if (header.algorithm == Algorithm::Deflate || header.algorithm == Algorithm::Lzfse)
1,215✔
941
            return std::make_unique<DecompressInputStreamLibCompression>(source, first_block, header);
1,212✔
942
    }
943
#endif
944
    if (header.algorithm == Algorithm::Deflate)
1,218!
945
        return std::make_unique<DecompressInputStreamZlib>(source, first_block, total_size);
1,212✔
946
    return nullptr;
6✔
947
}
6✔
948

949
size_t compression::get_uncompressed_size_from_header(InputStream& source)
950
{
168,189✔
951
    auto first_block = source.next_block();
168,189✔
952
    return read_header(source, first_block).size;
168,189✔
953
}
168,189✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc