• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2322

20 May 2024 08:01PM UTC coverage: 90.811% (-0.02%) from 90.833%
2322

push

Evergreen

web-flow
RCORE-2099 Restore progress notifier behavior when sync session is already caught up (#7681)

101718 of 180092 branches covered (56.48%)

742 of 768 new or added lines in 4 files covered. (96.61%)

106 existing lines in 16 files now uncovered.

214769 of 236502 relevant lines covered (90.81%)

5853734.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.13
/test/object-store/sync/session/progress_notifications.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2017 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/sync/sync_session.hpp>
20

21
#include <realm/util/scope_exit.hpp>
22

23
#if REALM_ENABLE_AUTH_TESTS
24
#include "util/test_file.hpp"
25
#include "util/sync/flx_sync_harness.hpp"
26
#include "util/sync/sync_test_utils.hpp"
27

28
#include <realm/object-store/impl/object_accessor_impl.hpp>
29
#include <realm/object-store/sync/async_open_task.hpp>
30
#include <realm/object-store/util/scheduler.hpp>
31

32
using namespace realm::app;
33
#endif
34

35
#include <catch2/catch_all.hpp>
36
#include <catch2/matchers/catch_matchers_floating_point.hpp>
37
using namespace Catch::Matchers;
38

39
#include <iomanip>
40

41
using namespace realm;
42
using NotifierType = SyncSession::ProgressDirection;
43

44
struct ProgressEntry {
45
    uint64_t transferred = 0;
46
    uint64_t transferrable = 0;
47
    double estimate = 0.0;
48

49
    inline bool operator==(const ProgressEntry& other) const noexcept
50
    {
48✔
51
        return transferred == other.transferred && transferrable == other.transferrable && estimate == other.estimate;
48✔
52
    }
48✔
53
};
54

55
static std::string estimate_to_string(double est)
56
{
229✔
57
    std::ostringstream ss;
229✔
58
    ss << std::setprecision(4) << est;
229✔
59
    return ss.str();
229✔
60
}
229✔
61

62
static std::ostream& operator<<(std::ostream& os, const ProgressEntry& value)
NEW
63
{
×
NEW
64
    return os << util::format("{ transferred: %1, transferrable: %2, estimate: %3 }", value.transferred,
×
NEW
65
                              value.transferrable, estimate_to_string(value.estimate));
×
NEW
66
}
×
67

68

69
struct WaitableProgress : public util::AtomicRefCountBase {
70
    WaitableProgress(const std::shared_ptr<util::Logger>& base_logger, std::string context)
71
        : logger(std::move(context), base_logger)
23✔
72
    {
46✔
73
    }
46✔
74

75
    std::function<SyncSession::ProgressNotifierCallback> make_cb()
76
    {
46✔
77
        auto self = util::bind_ptr(this);
46✔
78
        return [self](uint64_t transferred, uint64_t transferrable, double estimate) {
229✔
79
            self->logger.debug("Progress callback called xferred: %1, xferrable: %2, estimate: %3", transferred,
229✔
80
                               transferrable, estimate_to_string(estimate));
229✔
81
            std::lock_guard lk(self->mutex);
229✔
82
            self->entries.push_back(ProgressEntry{transferred, transferrable, estimate});
229✔
83
            self->cv.notify_one();
229✔
84
        };
229✔
85
    }
46✔
86

87
    bool empty()
88
    {
4✔
89
        std::lock_guard lk(mutex);
4✔
90
        return entries.empty();
4✔
91
    }
4✔
92

93
    std::vector<ProgressEntry> wait_for_full_sync()
94
    {
50✔
95
        std::unique_lock lk(mutex);
50✔
96
        if (!cv.wait_for(lk, std::chrono::seconds(30), [&] {
93✔
97
                return !entries.empty() && entries.back().transferred >= entries.back().transferrable &&
93✔
98
                       entries.back().estimate >= 1.0;
93✔
99
            })) {
93✔
NEW
100
            CAPTURE(entries);
×
NEW
101
            FAIL("Failed while waiting for progress to complete");
×
NEW
102
            return {};
×
NEW
103
        }
×
104

105
        std::vector<ProgressEntry> ret;
50✔
106
        std::swap(ret, entries);
50✔
107
        return ret;
50✔
108
    }
50✔
109

110
    util::PrefixLogger logger;
111
    std::mutex mutex;
112
    std::condition_variable cv;
113
    std::vector<ProgressEntry> entries;
114
};
115

116
struct TestInputValue {
117
    struct IsRegistration {};
118
    explicit TestInputValue(IsRegistration)
119
        : is_registration(true)
7✔
120
    {
14✔
121
    }
14✔
122

123
    TestInputValue(int64_t query_version, double cur_estimate, uint64_t transferred, uint64_t transferrable)
124
        : query_version(query_version)
36✔
125
        , cur_estimate(cur_estimate)
36✔
126
        , transferred(transferred)
36✔
127
        , transferrable(transferrable)
36✔
128
    {
72✔
129
    }
72✔
130

131
    int64_t query_version = 0;
132
    double cur_estimate = 0;
133
    uint64_t transferred = 0;
134
    uint64_t transferrable = 0;
135
    bool is_registration = false;
136
};
137

138
struct TestValues {
139
    std::vector<TestInputValue> input_values;
140
    std::vector<ProgressEntry> expected_values;
141
    int64_t registered_at_query_version;
142
};
143

144
TEST_CASE("progress notification", "[sync][session][progress]") {
44✔
145
    using NotifierType = SyncSession::ProgressDirection;
44✔
146
    _impl::SyncProgressNotifier progress;
44✔
147

148
    SECTION("callback is not called prior to first update") {
44✔
149
        bool callback_was_called = false;
2✔
150
        progress.register_callback(
2✔
151
            [&](auto, auto, double) {
2✔
NEW
152
                callback_was_called = true;
×
NEW
153
            },
×
154
            NotifierType::upload, false, 0);
2✔
155
        progress.register_callback(
2✔
156
            [&](auto, auto, double) {
2✔
NEW
157
                callback_was_called = true;
×
NEW
158
            },
×
159
            NotifierType::download, false, 0);
2✔
160
        REQUIRE_FALSE(callback_was_called);
2!
161
    }
2✔
162

163
    SECTION("callback is invoked immediately when a progress update has already occurred") {
44✔
164
        progress.set_local_version(1);
6✔
165
        progress.update(0, 0, 0, 0, 1, 0.0, 0.0, 0);
6✔
166

167
        bool callback_was_called = false;
6✔
168
        SECTION("for upload notifications, with no data transfer ongoing") {
6✔
169
            double estimate = 0.0;
2✔
170
            progress.register_callback(
2✔
171
                [&](auto, auto, double ep) {
2✔
172
                    callback_was_called = true;
2✔
173
                    estimate = ep;
2✔
174
                },
2✔
175
                NotifierType::upload, false, 0);
2✔
176
            REQUIRE(callback_was_called);
2!
177
            REQUIRE(estimate == 0.0);
2!
178
        }
2✔
179

180
        SECTION("for download notifications, with no data transfer ongoing") {
6✔
181
            double estimate = 0.0;
2✔
182
            progress.register_callback(
2✔
183
                [&](auto, auto, double ep) {
2✔
184
                    callback_was_called = true;
2✔
185
                    estimate = ep;
2✔
186
                },
2✔
187
                NotifierType::download, false, 0);
2✔
188
            REQUIRE(estimate == 0.0);
2!
189
            REQUIRE(callback_was_called);
2!
190
        }
2✔
191

192
        SECTION("can register another notifier while in the initial notification without deadlock") {
6✔
193
            int counter = 0;
2✔
194
            progress.register_callback(
2✔
195
                [&](auto, auto, double) {
2✔
196
                    counter++;
2✔
197
                    progress.register_callback(
2✔
198
                        [&](auto, auto, double) {
2✔
199
                            counter++;
2✔
200
                        },
2✔
201
                        NotifierType::upload, false, 0);
2✔
202
                },
2✔
203
                NotifierType::download, false, 0);
2✔
204
            REQUIRE(counter == 2);
2!
205
        }
2✔
206
    }
6✔
207

208
    SECTION("callback is invoked after each update for streaming notifiers") {
44✔
209
        progress.update(0, 0, 0, 0, 1, 0.0, 0.0, 0);
8✔
210

211
        bool callback_was_called = false;
8✔
212
        uint64_t transferred = 0;
8✔
213
        uint64_t transferrable = 0;
8✔
214
        uint64_t current_transferred = 0;
8✔
215
        uint64_t current_transferrable = 0;
8✔
216
        double estimate = 0.0;
8✔
217

218
        SECTION("for upload notifications") {
8✔
219
            progress.register_callback(
2✔
220
                [&](auto xferred, auto xferable, double ep) {
8✔
221
                    transferred = xferred;
8✔
222
                    transferrable = xferable;
8✔
223
                    callback_was_called = true;
8✔
224
                    estimate = ep;
8✔
225
                },
8✔
226
                NotifierType::upload, true, 0);
2✔
227
            REQUIRE(callback_was_called);
2!
228

229
            // Now manually call the notifier handler a few times.
230
            callback_was_called = false;
2✔
231
            current_transferred = 60;
2✔
232
            current_transferrable = 912;
2✔
233
            double current_estimate = current_transferred / double(current_transferrable);
2✔
234
            progress.update(25, 26, current_transferred, current_transferrable, 1, 25 / double(26), current_estimate,
2✔
235
                            0);
2✔
236
            CHECK(callback_was_called);
2!
237
            CHECK(transferred == current_transferred);
2!
238
            CHECK(transferrable == current_transferrable);
2!
239
            CHECK(estimate == current_estimate);
2!
240

241
            // Second callback
242
            callback_was_called = false;
2✔
243
            current_transferred = 79;
2✔
244
            current_transferrable = 1021;
2✔
245
            current_estimate = current_transferred / double(current_transferrable);
2✔
246
            progress.update(68, 191, current_transferred, current_transferrable, 1, 68 / double(191),
2✔
247
                            current_estimate, 0);
2✔
248
            CHECK(callback_was_called);
2!
249
            CHECK(transferred == current_transferred);
2!
250
            CHECK(transferrable == current_transferrable);
2!
251
            CHECK(estimate == current_estimate);
2!
252

253
            // Third callback
254
            callback_was_called = false;
2✔
255
            current_transferred = 150;
2✔
256
            current_transferrable = 1228;
2✔
257
            current_estimate = current_transferred / double(current_transferrable);
2✔
258
            progress.update(199, 591, current_transferred, current_transferrable, 1, 199 / double(591),
2✔
259
                            current_estimate, 0);
2✔
260
            CHECK(callback_was_called);
2!
261
            CHECK(transferred == current_transferred);
2!
262
            CHECK(transferrable == current_transferrable);
2!
263
            CHECK(estimate == current_estimate);
2!
264
        }
2✔
265

266
        SECTION("for download notifications") {
8✔
267
            progress.register_callback(
2✔
268
                [&](auto xferred, auto xferable, double pe) {
8✔
269
                    transferred = xferred;
8✔
270
                    transferrable = xferable;
8✔
271
                    estimate = pe;
8✔
272
                    callback_was_called = true;
8✔
273
                },
8✔
274
                NotifierType::download, true, 0);
2✔
275
            REQUIRE(callback_was_called);
2!
276

277
            // Now manually call the notifier handler a few times.
278
            callback_was_called = false;
2✔
279
            current_transferred = 60;
2✔
280
            current_transferrable = 912;
2✔
281
            progress.update(current_transferred, current_transferrable, 25, 26, 1,
2✔
282
                            current_transferred / double(current_transferrable), 1.0, 0);
2✔
283
            CHECK(callback_was_called);
2!
284
            CHECK(transferred == current_transferred);
2!
285
            CHECK(transferrable == current_transferrable);
2!
286
            CHECK(estimate == current_transferred / double(current_transferrable));
2!
287

288
            // Second callback
289
            callback_was_called = false;
2✔
290
            current_transferred = 79;
2✔
291
            current_transferrable = 1021;
2✔
292
            progress.update(current_transferred, current_transferrable, 68, 191, 1,
2✔
293
                            current_transferred / double(current_transferrable), 1.0, 0);
2✔
294
            CHECK(callback_was_called);
2!
295
            CHECK(transferred == current_transferred);
2!
296
            CHECK(transferrable == current_transferrable);
2!
297
            CHECK(estimate == current_transferred / double(current_transferrable));
2!
298

299
            // Third callback
300
            callback_was_called = false;
2✔
301
            current_transferred = 150;
2✔
302
            current_transferrable = 1228;
2✔
303
            progress.update(current_transferred, current_transferrable, 199, 591, 1,
2✔
304
                            current_transferred / double(current_transferrable), 1.0, 0);
2✔
305
            CHECK(callback_was_called);
2!
306
            CHECK(transferred == current_transferred);
2!
307
            CHECK(transferrable == current_transferrable);
2!
308
        }
2✔
309

310
        SECTION("token unregistration works") {
8✔
311
            uint64_t token = progress.register_callback(
2✔
312
                [&](auto xferred, auto xferable, double) {
4✔
313
                    transferred = xferred;
4✔
314
                    transferrable = xferable;
4✔
315
                    callback_was_called = true;
4✔
316
                },
4✔
317
                NotifierType::download, true, 0);
2✔
318
            REQUIRE(callback_was_called);
2!
319

320
            // Now manually call the notifier handler a few times.
321
            callback_was_called = false;
2✔
322
            current_transferred = 60;
2✔
323
            current_transferrable = 912;
2✔
324
            double current_estimate = current_transferred / double(current_transferrable);
2✔
325
            progress.update(current_transferred, current_transferrable, 25, 26, 1, current_estimate, 25 / double(26),
2✔
326
                            0);
2✔
327
            CHECK(callback_was_called);
2!
328
            CHECK(transferred == current_transferred);
2!
329
            CHECK(transferrable == current_transferrable);
2!
330

331
            // Unregister
332
            progress.unregister_callback(token);
2✔
333

334
            // Second callback: should not actually do anything.
335
            callback_was_called = false;
2✔
336
            current_transferred = 150;
2✔
337
            current_transferrable = 1228;
2✔
338
            current_estimate = current_transferred / double(current_transferrable);
2✔
339
            progress.update(current_transferred, current_transferrable, 199, 591, 1, current_estimate,
2✔
340
                            199 / double(591), 0);
2✔
341
            CHECK(!callback_was_called);
2!
342
        }
2✔
343

344
        SECTION("for multiple notifiers") {
8✔
345
            progress.register_callback(
2✔
346
                [&](auto xferred, auto xferable, double ep) {
6✔
347
                    transferred = xferred;
6✔
348
                    transferrable = xferable;
6✔
349
                    estimate = ep;
6✔
350
                    callback_was_called = true;
6✔
351
                },
6✔
352
                NotifierType::download, true, 0);
2✔
353
            REQUIRE(callback_was_called);
2!
354

355
            // Register a second notifier.
356
            bool callback_was_called_2 = false;
2✔
357
            uint64_t transferred_2 = 0;
2✔
358
            uint64_t transferrable_2 = 0;
2✔
359
            double upload_estimate = 0.0;
2✔
360
            progress.register_callback(
2✔
361
                [&](auto xferred, auto xferable, double ep) {
6✔
362
                    transferred_2 = xferred;
6✔
363
                    transferrable_2 = xferable;
6✔
364
                    callback_was_called_2 = true;
6✔
365
                    upload_estimate = ep;
6✔
366
                },
6✔
367
                NotifierType::upload, true, 0);
2✔
368
            REQUIRE(callback_was_called_2);
2!
369

370
            // Now manually call the notifier handler a few times.
371
            callback_was_called = false;
2✔
372
            callback_was_called_2 = false;
2✔
373
            uint64_t current_uploaded = 16;
2✔
374
            uint64_t current_uploadable = 201;
2✔
375
            uint64_t current_downloaded = 68;
2✔
376
            uint64_t current_downloadable = 182;
2✔
377
            auto current_down_estimate = current_downloaded / double(current_downloadable);
2✔
378
            auto current_up_estimate = current_uploaded / double(current_uploadable);
2✔
379
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
380
                            current_down_estimate, current_up_estimate, 0);
2✔
381
            CHECK(callback_was_called);
2!
382
            CHECK(transferred == current_downloaded);
2!
383
            CHECK(transferrable == current_downloadable);
2!
384
            CHECK(estimate == current_down_estimate);
2!
385
            CHECK(callback_was_called_2);
2!
386
            CHECK(transferred_2 == current_uploaded);
2!
387
            CHECK(transferrable_2 == current_uploadable);
2!
388
            CHECK(upload_estimate == current_up_estimate);
2!
389

390
            // Second callback
391
            callback_was_called = false;
2✔
392
            callback_was_called_2 = false;
2✔
393
            current_uploaded = 31;
2✔
394
            current_uploadable = 329;
2✔
395
            current_downloaded = 76;
2✔
396
            current_downloadable = 191;
2✔
397
            current_down_estimate = current_downloaded / double(current_downloadable);
2✔
398
            current_up_estimate = current_uploaded / double(current_uploadable);
2✔
399
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
400
                            current_down_estimate, current_up_estimate, 0);
2✔
401
            CHECK(callback_was_called);
2!
402
            CHECK(transferred == current_downloaded);
2!
403
            CHECK(transferrable == current_downloadable);
2!
404
            CHECK(estimate == current_down_estimate);
2!
405
            CHECK(callback_was_called_2);
2!
406
            CHECK(transferred_2 == current_uploaded);
2!
407
            CHECK(transferrable_2 == current_uploadable);
2!
408
            CHECK(current_up_estimate == upload_estimate);
2!
409
        }
2✔
410
    }
8✔
411

412
    SECTION("properly runs for non-streaming notifiers") {
44✔
413
        bool callback_was_called = false;
14✔
414
        uint64_t transferred = 0;
14✔
415
        uint64_t transferrable = 0;
14✔
416
        uint64_t current_transferred = 0;
14✔
417
        uint64_t current_transferrable = 0;
14✔
418
        double upload_estimate = 0;
14✔
419
        double download_estimate = 0;
14✔
420

421
        SECTION("for upload notifications") {
14✔
422
            // Prime the progress updater
423
            current_transferred = 60;
2✔
424
            current_transferrable = 501;
2✔
425
            const uint64_t original_transferrable = current_transferrable;
2✔
426
            double current_estimate = current_transferred / double(current_transferrable);
2✔
427
            progress.update(21, 26, current_transferred, current_transferrable, 1, 21 / double(26), current_estimate,
2✔
428
                            0);
2✔
429

430
            progress.register_callback(
2✔
431
                [&](auto xferred, auto xferable, double ep) {
6✔
432
                    transferred = xferred;
6✔
433
                    transferrable = xferable;
6✔
434
                    upload_estimate = ep;
6✔
435
                    callback_was_called = true;
6✔
436
                },
6✔
437
                NotifierType::upload, false, 0);
2✔
438
            REQUIRE(callback_was_called);
2!
439

440
            // Now manually call the notifier handler a few times.
441
            callback_was_called = false;
2✔
442
            current_transferred = 66;
2✔
443
            current_transferrable = 582;
2✔
444
            current_estimate = current_transferred / double(current_transferrable);
2✔
445
            progress.update(21, 26, current_transferred, current_transferrable, 1, 21 / double(26), current_estimate,
2✔
446
                            0);
2✔
447
            CHECK(callback_was_called);
2!
448
            CHECK(transferred == current_transferred);
2!
449
            CHECK(transferrable == original_transferrable);
2!
450
            CHECK(upload_estimate == current_transferred / double(original_transferrable));
2!
451

452
            // Second callback
453
            callback_was_called = false;
2✔
454
            current_transferred = original_transferrable + 100;
2✔
455
            current_transferrable = 1021;
2✔
456
            current_estimate = current_transferred / double(current_transferrable);
2✔
457
            progress.update(68, 191, current_transferred, current_transferrable, 1, 68 / double(191),
2✔
458
                            current_estimate, 0);
2✔
459
            CHECK(callback_was_called);
2!
460
            CHECK(transferred == current_transferred);
2!
461
            CHECK(transferrable == original_transferrable);
2!
462
            CHECK(upload_estimate == 1.0);
2!
463

464
            // The notifier should be unregistered at this point, and not fire.
465
            callback_was_called = false;
2✔
466
            current_transferred = original_transferrable + 250;
2✔
467
            current_transferrable = 1228;
2✔
468
            current_estimate = current_transferred / double(current_transferrable);
2✔
469
            progress.update(199, 591, current_transferred, current_transferrable, 1, 199 / double(591),
2✔
470
                            current_estimate, 0);
2✔
471
            CHECK(!callback_was_called);
2!
472
        }
2✔
473

474
        SECTION("upload notifications are not sent until all local changesets have been processed") {
14✔
475
            progress.set_local_version(4);
2✔
476

477
            progress.register_callback(
2✔
478
                [&](auto xferred, auto xferable, double) {
2✔
479
                    transferred = xferred;
2✔
480
                    transferrable = xferable;
2✔
481
                    callback_was_called = true;
2✔
482
                },
2✔
483
                NotifierType::upload, false, 0);
2✔
484
            REQUIRE_FALSE(callback_was_called);
2!
485

486
            current_transferred = 66;
2✔
487
            current_transferrable = 582;
2✔
488
            double current_estimate = current_transferred / double(current_transferrable);
2✔
489
            progress.update(0, 0, current_transferred, current_transferrable, 3, 1.0, current_estimate, 0);
2✔
490
            REQUIRE_FALSE(callback_was_called);
2!
491

492
            current_transferred = 77;
2✔
493
            current_transferrable = 1021;
2✔
494
            current_estimate = current_transferred / double(current_transferrable);
2✔
495
            progress.update(0, 0, current_transferred, current_transferrable, 4, 1.0, current_estimate, 0);
2✔
496
            REQUIRE(callback_was_called);
2!
497
            CHECK(transferred == current_transferred);
2!
498
            // should not have captured transferrable from the first update
499
            CHECK(transferrable == current_transferrable);
2!
500
            CHECK(current_estimate == current_estimate);
2!
501
        }
2✔
502

503
        SECTION("for download notifications") {
14✔
504
            // Prime the progress updater
505
            current_transferred = 60;
2✔
506
            current_transferrable = 501;
2✔
507
            double current_estimate = current_transferred / double(current_transferrable);
2✔
508
            const uint64_t original_transferrable = current_transferrable;
2✔
509
            progress.update(current_transferred, current_transferrable, 21, 26, 1, current_estimate, 21 / double(26),
2✔
510
                            0);
2✔
511

512
            progress.register_callback(
2✔
513
                [&](auto xferred, auto xferable, double ep) {
6✔
514
                    transferred = xferred;
6✔
515
                    transferrable = xferable;
6✔
516
                    download_estimate = ep;
6✔
517
                    callback_was_called = true;
6✔
518
                },
6✔
519
                NotifierType::download, false, 0);
2✔
520
            REQUIRE(callback_was_called);
2!
521

522
            // Now manually call the notifier handler a few times.
523
            callback_was_called = false;
2✔
524
            current_transferred = 66;
2✔
525
            current_transferrable = 582;
2✔
526
            current_estimate = current_transferred / double(current_transferrable);
2✔
527
            progress.update(current_transferred, current_transferrable, 25, 26, 1, current_estimate, 25 / double(26),
2✔
528
                            0);
2✔
529
            CHECK(callback_was_called);
2!
530
            CHECK(transferred == current_transferred);
2!
531
            CHECK(transferrable == original_transferrable);
2!
532
            CHECK(download_estimate == current_estimate);
2!
533

534
            // Second callback
535
            callback_was_called = false;
2✔
536
            current_transferred = original_transferrable + 100;
2✔
537
            current_transferrable = 1021;
2✔
538
            current_estimate = current_transferred / double(current_transferrable);
2✔
539
            progress.update(current_transferred, current_transferrable, 68, 191, 1, current_estimate,
2✔
540
                            68 / double(191), 0);
2✔
541
            CHECK(callback_was_called);
2!
542
            CHECK(transferred == current_transferred);
2!
543
            CHECK(transferrable == original_transferrable);
2!
544
            CHECK(download_estimate == current_estimate);
2!
545

546
            // The notifier should be unregistered at this point, and not fire.
547
            callback_was_called = false;
2✔
548
            current_transferred = original_transferrable + 250;
2✔
549
            current_transferrable = 1228;
2✔
550
            current_estimate = current_transferred / double(current_transferrable);
2✔
551
            progress.update(current_transferred, current_transferrable, 199, 591, 1, current_estimate,
2✔
552
                            199 / double(591), 0);
2✔
553
            CHECK(!callback_was_called);
2!
554
        }
2✔
555

556
        SECTION("token unregistration works") {
14✔
557
            // Prime the progress updater
558
            current_transferred = 60;
2✔
559
            current_transferrable = 501;
2✔
560
            double current_estimate = current_transferred / double(current_transferrable);
2✔
561
            const uint64_t original_transferrable = current_transferrable;
2✔
562
            progress.update(21, 26, current_transferred, current_transferrable, 1, 21 / double(26), current_estimate,
2✔
563
                            0);
2✔
564

565
            uint64_t token = progress.register_callback(
2✔
566
                [&](auto xferred, auto xferable, double ep) {
4✔
567
                    transferred = xferred;
4✔
568
                    transferrable = xferable;
4✔
569
                    upload_estimate = ep;
4✔
570
                    callback_was_called = true;
4✔
571
                },
4✔
572
                NotifierType::upload, false, 0);
2✔
573
            REQUIRE(callback_was_called);
2!
574

575
            // Now manually call the notifier handler a few times.
576
            callback_was_called = false;
2✔
577
            current_transferred = 66;
2✔
578
            current_transferrable = 912;
2✔
579
            current_estimate = current_transferred / double(current_transferrable);
2✔
580
            progress.update(25, 26, current_transferred, current_transferrable, 1, 25 / double(26), current_estimate,
2✔
581
                            0);
2✔
582
            CHECK(callback_was_called);
2!
583
            CHECK(transferred == current_transferred);
2!
584
            CHECK(transferrable == original_transferrable);
2!
585
            CHECK(upload_estimate == std::min(1.0, current_transferred / double(original_transferrable)));
2!
586

587
            // Unregister
588
            progress.unregister_callback(token);
2✔
589

590
            // Second callback: should not actually do anything.
591
            callback_was_called = false;
2✔
592
            current_transferred = 67;
2✔
593
            current_transferrable = 1228;
2✔
594
            current_estimate = current_transferred / double(current_transferrable);
2✔
595
            progress.update(199, 591, current_transferred, current_transferrable, 1, 199 / double(591),
2✔
596
                            current_estimate, 0);
2✔
597
            CHECK(!callback_was_called);
2!
598
        }
2✔
599

600
        SECTION("for multiple notifiers, different directions") {
14✔
601
            // Prime the progress updater
602
            uint64_t current_uploaded = 16;
2✔
603
            uint64_t current_uploadable = 201;
2✔
604
            uint64_t current_downloaded = 68;
2✔
605
            uint64_t current_downloadable = 182;
2✔
606
            const uint64_t original_uploadable = current_uploadable;
2✔
607
            const uint64_t original_downloadable = current_downloadable;
2✔
608
            double current_upload_estimate = current_uploaded / double(current_uploadable);
2✔
609
            double current_download_estimate = current_downloaded / double(current_downloadable);
2✔
610
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
611
                            current_download_estimate, current_upload_estimate, 0);
2✔
612

613
            progress.register_callback(
2✔
614
                [&](auto xferred, auto xferable, double ep) {
6✔
615
                    transferred = xferred;
6✔
616
                    transferrable = xferable;
6✔
617
                    upload_estimate = ep;
6✔
618
                    callback_was_called = true;
6✔
619
                },
6✔
620
                NotifierType::upload, false, 0);
2✔
621
            REQUIRE(callback_was_called);
2!
622

623
            // Register a second notifier.
624
            bool callback_was_called_2 = false;
2✔
625
            uint64_t downloaded = 0;
2✔
626
            uint64_t downloadable = 0;
2✔
627
            progress.register_callback(
2✔
628
                [&](auto xferred, auto xferable, double ep) {
8✔
629
                    downloaded = xferred;
8✔
630
                    downloadable = xferable;
8✔
631
                    download_estimate = ep;
8✔
632
                    callback_was_called_2 = true;
8✔
633
                },
8✔
634
                NotifierType::download, false, 0);
2✔
635
            REQUIRE(callback_was_called_2);
2!
636

637
            // Now manually call the notifier handler a few times.
638
            callback_was_called = false;
2✔
639
            callback_was_called_2 = false;
2✔
640
            current_uploaded = 36;
2✔
641
            current_uploadable = 310;
2✔
642
            current_downloaded = 171;
2✔
643
            current_downloadable = 185;
2✔
644
            current_upload_estimate = current_uploaded / double(current_uploadable);
2✔
645
            current_download_estimate = current_downloaded / double(current_downloadable);
2✔
646
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
647
                            current_download_estimate, current_upload_estimate, 0);
2✔
648
            CHECK(callback_was_called);
2!
649
            CHECK(transferred == current_uploaded);
2!
650
            CHECK(transferrable == original_uploadable);
2!
651
            CHECK(callback_was_called_2);
2!
652
            CHECK(downloaded == current_downloaded);
2!
653
            CHECK(downloadable == original_downloadable);
2!
654
            CHECK(upload_estimate == std::min(1.0, current_uploaded / double(original_uploadable)));
2!
655
            CHECK(download_estimate == current_download_estimate);
2!
656

657
            // Second callback, last one for the upload notifier
658
            callback_was_called = false;
2✔
659
            callback_was_called_2 = false;
2✔
660
            current_uploaded = 218;
2✔
661
            current_uploadable = 310;
2✔
662
            current_downloaded = 174;
2✔
663
            current_downloadable = 190;
2✔
664
            current_upload_estimate = current_uploaded / double(current_uploadable);
2✔
665
            current_download_estimate = current_downloaded / double(current_downloadable);
2✔
666
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
667
                            current_download_estimate, current_upload_estimate, 0);
2✔
668
            CHECK(callback_was_called);
2!
669
            CHECK(transferred == current_uploaded);
2!
670
            CHECK(transferrable == original_uploadable);
2!
671
            CHECK(callback_was_called_2);
2!
672
            CHECK(downloaded == current_downloaded);
2!
673
            CHECK(downloadable == original_downloadable);
2!
674
            CHECK(upload_estimate == std::min(1.0, current_uploaded / double(original_uploadable)));
2!
675
            CHECK(download_estimate == current_download_estimate);
2!
676

677
            // Third callback, last one for the download notifier
678
            callback_was_called = false;
2✔
679
            callback_was_called_2 = false;
2✔
680
            current_uploaded = 218;
2✔
681
            current_uploadable = 310;
2✔
682
            current_downloaded = 182;
2✔
683
            current_downloadable = 196;
2✔
684
            current_upload_estimate = current_uploaded / double(current_uploadable);
2✔
685
            current_download_estimate = current_downloaded / double(current_downloadable);
2✔
686
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
687
                            current_download_estimate, current_upload_estimate, 0);
2✔
688
            CHECK(!callback_was_called);
2!
689
            CHECK(callback_was_called_2);
2!
690
            CHECK(downloaded == current_downloaded);
2!
691
            CHECK(downloadable == original_downloadable);
2!
692
            CHECK(upload_estimate == std::min(1.0, current_uploaded / double(original_uploadable)));
2!
693
            CHECK(download_estimate == current_download_estimate);
2!
694

695
            // Fourth callback, last one for the download notifier
696
            callback_was_called_2 = false;
2✔
697
            current_uploaded = 220;
2✔
698
            current_uploadable = 410;
2✔
699
            current_downloaded = 192;
2✔
700
            current_downloadable = 591;
2✔
701
            current_upload_estimate = current_uploaded / double(current_uploadable);
2✔
702
            current_download_estimate = current_downloaded / double(current_downloadable);
2✔
703
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
704
                            current_download_estimate, current_upload_estimate, 0);
2✔
705
            CHECK(!callback_was_called);
2!
706
            CHECK(!callback_was_called_2);
2!
707
        }
2✔
708

709
        SECTION("for multiple notifiers, same direction") {
14✔
710
            // Prime the progress updater
711
            uint64_t current_uploaded = 16;
2✔
712
            uint64_t current_uploadable = 201;
2✔
713
            uint64_t current_downloaded = 68;
2✔
714
            uint64_t current_downloadable = 182;
2✔
715
            double current_upload_estimate = current_uploaded / double(current_uploadable);
2✔
716
            double current_download_estimate = current_downloaded / double(current_downloadable);
2✔
717

718
            const uint64_t original_downloadable = current_downloadable;
2✔
719
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
720
                            current_download_estimate, current_upload_estimate, 0);
2✔
721

722
            progress.register_callback(
2✔
723
                [&](auto xferred, auto xferable, double ep) {
6✔
724
                    transferred = xferred;
6✔
725
                    transferrable = xferable;
6✔
726
                    download_estimate = ep;
6✔
727
                    callback_was_called = true;
6✔
728
                },
6✔
729
                NotifierType::download, false, 0);
2✔
730
            REQUIRE(callback_was_called);
2!
731

732
            // Now manually call the notifier handler a few times.
733
            callback_was_called = false;
2✔
734
            current_uploaded = 36;
2✔
735
            current_uploadable = 310;
2✔
736
            current_downloaded = 171;
2✔
737
            current_downloadable = 185;
2✔
738
            current_upload_estimate = current_uploaded / double(current_uploadable);
2✔
739
            current_download_estimate = current_downloaded / double(current_downloadable);
2✔
740

741
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
742
                            current_download_estimate, current_upload_estimate, 0);
2✔
743
            CHECK(callback_was_called);
2!
744
            CHECK(transferred == current_downloaded);
2!
745
            CHECK(transferrable == original_downloadable);
2!
746

747
            // Register a second notifier.
748
            bool callback_was_called_2 = false;
2✔
749
            uint64_t downloaded = 0;
2✔
750
            uint64_t downloadable = 0;
2✔
751
            const uint64_t original_downloadable_2 = current_downloadable;
2✔
752
            progress.register_callback(
2✔
753
                [&](auto xferred, auto xferable, double ep) {
6✔
754
                    downloaded = xferred;
6✔
755
                    downloadable = xferable;
6✔
756
                    download_estimate = ep;
6✔
757
                    callback_was_called_2 = true;
6✔
758
                },
6✔
759
                NotifierType::download, false, 0);
2✔
760
            REQUIRE(callback_was_called_2);
2!
761

762
            // Second callback, last one for first notifier
763
            callback_was_called = false;
2✔
764
            callback_was_called_2 = false;
2✔
765
            current_uploaded = 36;
2✔
766
            current_uploadable = 310;
2✔
767
            current_downloaded = 182;
2✔
768
            current_downloadable = 190;
2✔
769
            current_upload_estimate = current_uploaded / double(current_uploadable);
2✔
770
            current_download_estimate = current_downloaded / double(current_downloadable);
2✔
771
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
772
                            current_download_estimate, current_upload_estimate, 0);
2✔
773
            CHECK(callback_was_called);
2!
774
            CHECK(transferred == current_downloaded);
2!
775
            CHECK(transferrable == original_downloadable);
2!
776
            CHECK(callback_was_called_2);
2!
777
            CHECK(downloaded == current_downloaded);
2!
778
            CHECK(downloadable == original_downloadable_2);
2!
779
            CHECK(download_estimate == current_download_estimate);
2!
780

781
            // Third callback, last one for second notifier
782
            callback_was_called = false;
2✔
783
            callback_was_called_2 = false;
2✔
784
            current_uploaded = 36;
2✔
785
            current_uploadable = 310;
2✔
786
            current_downloaded = 189;
2✔
787
            current_downloadable = 250;
2✔
788
            current_upload_estimate = current_uploaded / double(current_uploadable);
2✔
789
            current_download_estimate = current_downloaded / double(current_downloadable);
2✔
790
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
791
                            current_download_estimate, current_upload_estimate, 0);
2✔
792
            CHECK(!callback_was_called);
2!
793
            CHECK(callback_was_called_2);
2!
794
            CHECK(downloaded == current_downloaded);
2!
795
            CHECK(downloadable == original_downloadable_2);
2!
796
            CHECK(download_estimate == current_download_estimate);
2!
797

798
            // Fourth callback
799
            callback_was_called_2 = false;
2✔
800
            current_uploaded = 36;
2✔
801
            current_uploadable = 310;
2✔
802
            current_downloaded = 201;
2✔
803
            current_downloadable = 289;
2✔
804
            current_upload_estimate = current_uploaded / double(current_uploadable);
2✔
805
            current_download_estimate = current_downloaded / double(current_downloadable);
2✔
806
            progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1,
2✔
807
                            current_download_estimate, current_upload_estimate, 0);
2✔
808
            CHECK(!callback_was_called_2);
2!
809
        }
2✔
810

811
        SECTION("download notifiers handle transferrable decreasing") {
14✔
812
            // Prime the progress updater
813
            current_transferred = 60;
2✔
814
            current_transferrable = 501;
2✔
815
            const uint64_t original_transferrable = current_transferrable;
2✔
816
            double current_estimate = current_transferred / double(current_transferrable);
2✔
817
            progress.update(current_transferred, current_transferrable, 21, 26, 1, current_estimate, 21 / double(26),
2✔
818
                            0);
2✔
819

820
            progress.register_callback(
2✔
821
                [&](auto xferred, auto xferable, double ep) {
8✔
822
                    transferred = xferred;
8✔
823
                    transferrable = xferable;
8✔
824
                    callback_was_called = true;
8✔
825
                    download_estimate = ep;
8✔
826
                },
8✔
827
                NotifierType::download, false, 0);
2✔
828
            REQUIRE(callback_was_called);
2!
829

830
            // Download some data but also drop the total. transferrable should
831
            // update because it decreased.
832
            callback_was_called = false;
2✔
833
            current_transferred = 160;
2✔
834
            current_transferrable = 451;
2✔
835
            current_estimate = current_transferred / double(current_transferrable);
2✔
836
            progress.update(current_transferred, current_transferrable, 25, 26, 1, current_estimate, 26 / double(26),
2✔
837
                            0);
2✔
838
            CHECK(callback_was_called);
2!
839
            CHECK(transferred == current_transferred);
2!
840
            CHECK(transferrable == current_transferrable);
2!
841
            CHECK(current_estimate == download_estimate);
2!
842

843
            // Increasing current_transferrable should not increase transferrable
844
            const uint64_t previous_transferrable = current_transferrable;
2✔
845
            callback_was_called = false;
2✔
846
            current_transferrable = 1000;
2✔
847
            current_estimate = current_transferred / double(current_transferrable);
2✔
848
            progress.update(current_transferred, current_transferrable, 68, 191, 1, current_estimate,
2✔
849
                            68 / double(191), 0);
2✔
850
            CHECK(callback_was_called);
2!
851
            CHECK(transferred == current_transferred);
2!
852
            CHECK(transferrable == previous_transferrable);
2!
853
            CHECK(download_estimate == current_estimate);
2!
854

855
            // Transferrable dropping to be equal to transferred should notify
856
            // and then expire the notifier
857
            callback_was_called = false;
2✔
858
            current_transferred = 200;
2✔
859
            current_transferrable = current_transferred;
2✔
860
            current_estimate = current_transferred / double(current_transferrable);
2✔
861
            progress.update(current_transferred, current_transferrable, 191, 192, 1, current_estimate,
2✔
862
                            191 / double(192), 0);
2✔
863
            CHECK(callback_was_called);
2!
864
            CHECK(transferred == current_transferred);
2!
865
            CHECK(transferrable == current_transferred);
2!
866
            CHECK(current_estimate == download_estimate);
2!
867

868
            // The notifier should be unregistered at this point, and not fire.
869
            callback_was_called = false;
2✔
870
            current_transferred = original_transferrable + 250;
2✔
871
            current_transferrable = 1228;
2✔
872
            current_estimate = current_transferred / double(current_transferrable);
2✔
873

874
            progress.update(current_transferred, current_transferrable, 199, 591, 1, current_estimate,
2✔
875
                            199 / double(591), 0);
2✔
876
            CHECK(!callback_was_called);
2!
877
        }
2✔
878
    }
14✔
879

880
    SECTION("flx streaming notifiers") {
44✔
881
        // clang-format off
882
        TestValues test_values = GENERATE(
6✔
883
            // resgisters at the begining and should see all entries.
884
            TestValues{{
6✔
885
                TestInputValue{TestInputValue::IsRegistration{}},
6✔
886
                TestInputValue{0, 0, 0, 0},
6✔
887
                TestInputValue{0, 1, 200, 200},
6✔
888
                TestInputValue{1, 0.2, 300, 600},
6✔
889
                TestInputValue{1, 0.4, 400, 600},
6✔
890
                TestInputValue{1, 0.8, 600, 700},
6✔
891
                TestInputValue{1, 1, 700, 700},
6✔
892
                TestInputValue{2, 0.3, 800, 1000},
6✔
893
                TestInputValue{2, 0.6, 900, 1000},
6✔
894
                TestInputValue{2, 1, 1000, 1000},
6✔
895
            }, {
6✔
896
                ProgressEntry{0, 0, 0},
6✔
897
                ProgressEntry{200, 200, 1},
6✔
898
                ProgressEntry{300, 600, 0.2},
6✔
899
                ProgressEntry{400, 600, 0.4},
6✔
900
                ProgressEntry{600, 700, 0.8},
6✔
901
                ProgressEntry{700, 700, 1},
6✔
902
                ProgressEntry{800, 1000, 0.3},
6✔
903
                ProgressEntry{900, 1000, 0.6},
6✔
904
                ProgressEntry{1000, 1000, 1},
6✔
905
            }, 1},
6✔
906
            TestValues{{
6✔
907
                TestInputValue{1, 0.2, 300, 600},
6✔
908
                TestInputValue{1, 0.4, 400, 600},
6✔
909
                TestInputValue{TestInputValue::IsRegistration{}},
6✔
910
                TestInputValue{1, 0.8, 600, 700},
6✔
911
                TestInputValue{1, 1, 700, 700},
6✔
912
            }, {
6✔
913
                ProgressEntry{400, 600, 0.4},
6✔
914
                ProgressEntry{600, 700, 0.8},
6✔
915
                ProgressEntry{700, 700, 1.0},
6✔
916
            }, 1},
6✔
917
            // registers for a query version that's already up-to-date - should get an immediate update
918
            // with a progress estimate of 1 and whatever the current transferred/transferrable numbers are
919
            TestValues{{
6✔
920
                TestInputValue{2, 0.5, 800, 900},
6✔
921
                TestInputValue{2, 1, 900, 900},
6✔
922
                TestInputValue{TestInputValue::IsRegistration{}},
6✔
923
                TestInputValue{2, 1, 1000, 1000}
6✔
924
            }, {
6✔
925
                ProgressEntry{900, 900, 1},
6✔
926
                ProgressEntry{1000, 1000, 1},
6✔
927
            }, 1}
6✔
928
        );
6✔
929
        // clang-format on
930

931
        auto logger = util::Logger::get_default_logger();
6✔
932
        auto progress_output = util::make_bind<WaitableProgress>(logger, "flx non-streaming download");
6✔
933

934
        uint64_t snapshot = 1;
6✔
935
        for (const auto& input_val : test_values.input_values) {
38✔
936
            if (input_val.is_registration) {
38✔
937
                progress.register_callback(progress_output->make_cb(), NotifierType::download, true,
6✔
938
                                           test_values.registered_at_query_version);
6✔
939
                continue;
6✔
940
            }
6✔
941
            progress.update(input_val.transferred, input_val.transferrable, 0, 0, ++snapshot, input_val.cur_estimate,
32✔
942
                            0.0, input_val.query_version);
32✔
943
        }
32✔
944

945
        const auto output_values = progress_output->wait_for_full_sync();
6✔
946

947
        REQUIRE_THAT(output_values, Catch::Matchers::Equals(test_values.expected_values));
6✔
948
    }
6✔
949

950
    SECTION("flx non-streaming notifiers") {
44✔
951
        // clang-format off
952
        TestValues test_values = GENERATE(
8✔
953
            // registers for query version 1 on an empty realm - should see the full progression
954
            // of query version 1 and nothing else.
955
            TestValues{{
8✔
956
                TestInputValue{TestInputValue::IsRegistration{}},
8✔
957
                TestInputValue{0, 0, 0, 0},
8✔
958
                TestInputValue{0, 1, 200, 200},
8✔
959
                TestInputValue{1, 0.2, 300, 600},
8✔
960
                TestInputValue{1, 0.4, 400, 600},
8✔
961
                TestInputValue{1, 0.8, 600, 700},
8✔
962
                TestInputValue{1, 1, 700, 700},
8✔
963
                TestInputValue{2, 0.3, 800, 1000},
8✔
964
                TestInputValue{2, 0.6, 900, 1000},
8✔
965
                TestInputValue{2, 1, 1000, 1000},
8✔
966
            }, {
8✔
967
                ProgressEntry{300, 600, 0.2},
8✔
968
                ProgressEntry{400, 600, 0.4},
8✔
969
                ProgressEntry{600, 600, 0.8},
8✔
970
                ProgressEntry{700, 600, 1.0},
8✔
971
            }, 1},
8✔
972
            // registers a notifier in the middle of syncing the target query version
973
            TestValues{{
8✔
974
                TestInputValue{1, 0.2, 300, 600},
8✔
975
                TestInputValue{1, 0.4, 400, 600},
8✔
976
                TestInputValue{TestInputValue::IsRegistration{}},
8✔
977
                TestInputValue{1, 0.8, 600, 700},
8✔
978
                TestInputValue{1, 1, 700, 700},
8✔
979
                // There's also a progress notification for a regular steady state
980
                // download message that gets ignored because we're already up-to-date
981
                TestInputValue{1, 1, 800, 800},
8✔
982
            }, {
8✔
983
                ProgressEntry{400, 600, 0.4},
8✔
984
                ProgressEntry{600, 600, 0.8},
8✔
985
                ProgressEntry{700, 600, 1.0},
8✔
986
            }, 1},
8✔
987
            // registers for a notifier for a later query version - should only see notifications
988
            // for downloads greater than the requested query version
989
            TestValues{{
8✔
990

991
                TestInputValue{TestInputValue::IsRegistration{}},
8✔
992
                TestInputValue{1, 0.8, 700, 700},
8✔
993
                TestInputValue{1, 1, 700, 700},
8✔
994
                TestInputValue{3, 0.5, 800, 900},
8✔
995
                TestInputValue{3, 1, 900, 900},
8✔
996
            }, {
8✔
997
                ProgressEntry{800, 900, 0.5},
8✔
998
                ProgressEntry{900, 900, 1},
8✔
999
            }, 2},
8✔
1000
            // registers for a query version that's already up-to-date - should get an immediate update
1001
            // with a progress estimate of 1 and whatever the current transferred/transferrable numbers are
1002
            TestValues{{
8✔
1003
                TestInputValue{2, 0.5, 800, 900},
8✔
1004
                TestInputValue{2, 1, 900, 900},
8✔
1005
                TestInputValue{TestInputValue::IsRegistration{}},
8✔
1006
            }, {
8✔
1007
                ProgressEntry{900, 900, 1},
8✔
1008
            }, 1}
8✔
1009
        );
8✔
1010
        // clang-format on
1011

1012
        auto logger = util::Logger::get_default_logger();
8✔
1013
        auto progress_output = util::make_bind<WaitableProgress>(logger, "flx non-streaming download");
8✔
1014

1015
        uint64_t snapshot = 1;
8✔
1016
        for (const auto& input_val : test_values.input_values) {
48✔
1017
            if (input_val.is_registration) {
48✔
1018
                progress.register_callback(progress_output->make_cb(), NotifierType::download, false,
8✔
1019
                                           test_values.registered_at_query_version);
8✔
1020
                continue;
8✔
1021
            }
8✔
1022
            progress.update(input_val.transferred, input_val.transferrable, 0, 0, ++snapshot, input_val.cur_estimate,
40✔
1023
                            0.0, input_val.query_version);
40✔
1024
        }
40✔
1025

1026
        const auto output_values = progress_output->wait_for_full_sync();
8✔
1027

1028
        REQUIRE_THAT(output_values, Catch::Matchers::Equals(test_values.expected_values));
8✔
1029
    }
8✔
1030
}
44✔
1031

1032
#if REALM_ENABLE_AUTH_TESTS
1033

1034
struct TestSetup {
1035
    TableRef get_table(const SharedRealm& r)
1036
    {
26✔
1037
        return r->read_group().get_table("class_" + table_name);
26✔
1038
    }
26✔
1039

1040
    size_t add_objects(SharedRealm& r, int num)
1041
    {
16✔
1042
        CppContext ctx(r);
16✔
1043
        for (int i = 0; i < num; ++i) {
116✔
1044
            // use specifically separate transactions for a bit of history
1045
            r->begin_transaction();
100✔
1046
            Object::create(ctx, r, StringData(table_name), std::any(make_one(i)));
100✔
1047
            r->commit_transaction();
100✔
1048
        }
100✔
1049
        return get_table(r)->size();
16✔
1050
    }
16✔
1051

1052
    virtual SyncTestFile make_config() = 0;
1053
    virtual AnyDict make_one(int64_t idx) = 0;
1054

1055
    std::string table_name;
1056
};
1057

1058
struct PBS : TestSetup {
1059
    PBS()
1060
    {
8✔
1061
        table_name = "Dog";
8✔
1062
    }
8✔
1063

1064
    SyncTestFile make_config() override
1065
    {
10✔
1066
        return SyncTestFile(session.app()->current_user(), partition, get_default_schema());
10✔
1067
    }
10✔
1068

1069
    AnyDict make_one(int64_t /* idx */) override
1070
    {
50✔
1071
        return AnyDict{{"_id", std::any(ObjectId::gen())},
50✔
1072
                       {"breed", std::string("bulldog")},
50✔
1073
                       {"name", random_string(1024 * 1024)}};
50✔
1074
    }
50✔
1075

1076
    TestAppSession session;
1077
    const std::string partition = random_string(100);
1078
};
1079

1080
struct FLX : TestSetup {
1081
    FLX(const std::string& app_id = "flx_sync_progress")
1082
        : harness(app_id)
4✔
1083
    {
8✔
1084
        table_name = harness.schema().begin()->name;
8✔
1085
    }
8✔
1086

1087
    SyncTestFile make_config() override
1088
    {
10✔
1089
        auto config = harness.make_test_file();
10✔
1090
        add_subscription(*config.sync_config);
10✔
1091
        return config;
10✔
1092
    }
10✔
1093

1094
    void add_subscription(SyncConfig& config)
1095
    {
10✔
1096
        config.rerun_init_subscription_on_open = true;
10✔
1097
        config.subscription_initializer = [&](SharedRealm&& realm) {
10✔
1098
            add_subscription(realm);
10✔
1099
        };
10✔
1100
    }
10✔
1101

1102
    void add_subscription(SharedRealm& realm)
1103
    {
10✔
1104
        auto sub = realm->get_latest_subscription_set().make_mutable_copy();
10✔
1105
        sub.insert_or_assign(Query(get_table(realm)));
10✔
1106
        sub.commit();
10✔
1107
    }
10✔
1108

1109
    AnyDict make_one(int64_t idx) override
1110
    {
50✔
1111
        return AnyDict{{"_id", ObjectId::gen()},
50✔
1112
                       {"queryable_int_field", idx},
50✔
1113
                       {"queryable_str_field", random_string(1024 * 1024)}};
50✔
1114
    }
50✔
1115

1116
    FLXSyncTestHarness harness;
1117
};
1118

1119
struct ProgressIncreasesMatcher : Catch::Matchers::MatcherGenericBase {
1120
    enum MatchMode { ByteCountOnly, All };
1121
    ProgressIncreasesMatcher() = default;
16✔
1122
    explicit ProgressIncreasesMatcher(MatchMode mode)
1123
        : m_mode(mode)
4✔
1124
    {
8✔
1125
    }
8✔
1126

1127
    bool match(std::vector<ProgressEntry> const& entries) const
1128
    {
24✔
1129
        if (entries.size() < 1) {
24✔
NEW
1130
            return false;
×
NEW
1131
        }
×
1132

1133
        auto last = std::ref(entries.front());
24✔
1134
        for (size_t i = 1; i < entries.size(); ++i) {
168✔
1135
            ProgressEntry const& cur = entries[i];
144✔
1136
            if (cur.transferred < last.get().transferred) {
144✔
NEW
1137
                return false;
×
NEW
1138
            }
×
1139
            if (m_mode == All && cur.estimate < last.get().estimate) {
144✔
NEW
1140
                return false;
×
NEW
1141
            }
×
1142
            last = cur;
144✔
1143
        }
144✔
1144
        return true;
24✔
1145
    }
24✔
1146

1147
    std::string describe() const override
UNCOV
1148
    {
×
NEW
1149
        return "progress notifications all increase";
×
UNCOV
1150
    }
×
1151

1152
private:
1153
    MatchMode m_mode = All;
1154
};
1155

1156
TEMPLATE_TEST_CASE("progress notifications fire immediately when fully caught up", "[baas][progress][sync]", PBS, FLX)
1157
{
12✔
1158
    TestType pbs_setup;
12✔
1159
    auto logger = util::Logger::get_default_logger();
12✔
1160

1161
    auto validate_noop_entry = [&](const std::vector<ProgressEntry>& entries, std::string context) {
12✔
1162
        UNSCOPED_INFO("validating noop non-streaming entry " << context);
12✔
1163
        REQUIRE(entries.size() == 1);
12!
1164
        const auto& entry = entries.front();
12✔
1165
        REQUIRE(entry.transferred >= entry.transferrable);
12!
1166
        REQUIRE(entry.estimate >= 1.0);
12!
1167
    };
12✔
1168

1169
    SECTION("empty async open results in progress notification") {
12✔
1170
        auto config = pbs_setup.make_config();
4✔
1171
        auto async_open_task = Realm::get_synchronized_realm(config);
4✔
1172
        auto async_open_progress = util::make_bind<WaitableProgress>(logger, "async open non-streaming progress ");
4✔
1173
        async_open_task->register_download_progress_notifier(async_open_progress->make_cb());
4✔
1174
        auto [promise, future] = util::make_promise_future<ThreadSafeReference>();
4✔
1175
        async_open_task->start(
4✔
1176
            [promise = std::move(promise)](ThreadSafeReference ref, std::exception_ptr ouch) mutable {
4✔
1177
                if (ouch) {
4✔
NEW
1178
                    try {
×
NEW
1179
                        std::rethrow_exception(ouch);
×
NEW
1180
                    }
×
NEW
1181
                    catch (...) {
×
NEW
1182
                        promise.set_error(exception_to_status());
×
NEW
1183
                    }
×
NEW
1184
                }
×
1185
                else {
4✔
1186
                    promise.emplace_value(std::move(ref));
4✔
1187
                }
4✔
1188
            });
4✔
1189

1190
        auto realm = Realm::get_shared_realm(std::move(future).get());
4✔
1191
        auto noop_download_progress = util::make_bind<WaitableProgress>(logger, "non-streaming download ");
4✔
1192
        auto noop_token = realm->sync_session()->register_progress_notifier(
4✔
1193
            noop_download_progress->make_cb(), SyncSession::ProgressDirection::download, false);
4✔
1194
        // The registration token for a non-streaming notifier that was expired at registration time
1195
        // is zero because it's invoked immediately and never registered for further notifications.
1196
        CHECK(noop_token == 0);
4!
1197

1198
        auto async_open_entries = async_open_progress->wait_for_full_sync();
4✔
1199
        REQUIRE_THAT(async_open_entries, ProgressIncreasesMatcher{});
4✔
1200
        validate_noop_entry(noop_download_progress->wait_for_full_sync(), "noop_download_progress");
4✔
1201
    }
4✔
1202

1203
    SECTION("synchronous open then waiting for download then noop notification") {
12✔
1204
        {
4✔
1205
            auto fill_data_config = pbs_setup.make_config();
4✔
1206
            auto fill_data_realm = Realm::get_shared_realm(fill_data_config);
4✔
1207
            pbs_setup.add_objects(fill_data_realm, 5);
4✔
1208
            wait_for_upload(*fill_data_realm);
4✔
1209
        }
4✔
1210

1211
        auto config = pbs_setup.make_config();
4✔
1212
        auto realm = Realm::get_shared_realm(config);
4✔
1213
        auto initial_progress = util::make_bind<WaitableProgress>(logger, "streaming initial progress ");
4✔
1214
        realm->sync_session()->register_progress_notifier(initial_progress->make_cb(), NotifierType::download, true);
4✔
1215

1216
        auto initial_entries = initial_progress->wait_for_full_sync();
4✔
1217
        REQUIRE(!initial_entries.empty());
4!
1218
        REQUIRE_THAT(initial_entries, ProgressIncreasesMatcher{});
4✔
1219

1220
        auto noop_download_progress = util::make_bind<WaitableProgress>(logger, "non-streaming noop download ");
4✔
1221
        auto noop_token = realm->sync_session()->register_progress_notifier(
4✔
1222
            noop_download_progress->make_cb(), SyncSession::ProgressDirection::download, false);
4✔
1223
        // The registration token for a non-streaming notifier that was expired at registration time
1224
        // is zero because it's invoked immediately and never registered for further notifications.
1225
        CHECK(noop_token == 0);
4!
1226

1227
        validate_noop_entry(noop_download_progress->wait_for_full_sync(), "noop_download_progress");
4✔
1228
    }
4✔
1229

1230
    SECTION("uploads") {
12✔
1231
        auto config = pbs_setup.make_config();
4✔
1232
        auto realm = Realm::get_shared_realm(config);
4✔
1233
        auto initial_progress = util::make_bind<WaitableProgress>(logger, "non-streaming initial progress ");
4✔
1234

1235
        pbs_setup.add_objects(realm, 5);
4✔
1236

1237
        auto token = realm->sync_session()->register_progress_notifier(initial_progress->make_cb(),
4✔
1238
                                                                       NotifierType::upload, false);
4✔
1239
        auto initial_entries = initial_progress->wait_for_full_sync();
4✔
1240
        REQUIRE(!initial_entries.empty());
4!
1241
        REQUIRE_THAT(initial_entries, ProgressIncreasesMatcher{});
4✔
1242
        realm->sync_session()->unregister_progress_notifier(token);
4✔
1243

1244
        // it's possible that we've reached full synchronization in the progress notifier, but because
1245
        // of the way non-streaming notifiers work, the transferable may be higher for the next
1246
        // non-streaming notifier than for the one that just finished. So we explicitly wait for
1247
        // all uploads to complete to check that registering a noop notifier here is actually a noop.
1248
        wait_for_upload(*realm);
4✔
1249

1250
        auto noop_upload_progress = util::make_bind<WaitableProgress>(logger, "non-streaming upload ");
4✔
1251
        auto noop_token = realm->sync_session()->register_progress_notifier(
4✔
1252
            noop_upload_progress->make_cb(), SyncSession::ProgressDirection::upload, false);
4✔
1253
        // The registration token for a non-streaming notifier that was expired at registration time
1254
        // is zero because it's invoked immediately and never registered for further notifications.
1255
        CHECK(noop_token == 0);
4!
1256

1257
        validate_noop_entry(noop_upload_progress->wait_for_full_sync(), "noop_upload_progress");
4✔
1258
    }
4✔
1259
}
12✔
1260

1261

1262
TEMPLATE_TEST_CASE("sync progress: upload progress", "[sync][baas][progress]", PBS, FLX)
1263
{
4✔
1264
    TestType setup;
4✔
1265

1266
    auto realm = Realm::get_shared_realm(setup.make_config());
4✔
1267
    auto sync_session = realm->sync_session();
4✔
1268
    auto logger = util::Logger::get_default_logger();
4✔
1269
    auto non_streaming_progress = util::make_bind<WaitableProgress>(logger, "non-streaming upload ");
4✔
1270
    auto streaming_progress = util::make_bind<WaitableProgress>(logger, "streaming upload ");
4✔
1271

1272
    // There is a race between creating the objects and registering the non-streaming notifier
1273
    // since
1274
    sync_session->pause();
4✔
1275

1276
    setup.add_objects(realm, 10);
4✔
1277
    sync_session->register_progress_notifier(non_streaming_progress->make_cb(), NotifierType::upload, false);
4✔
1278
    sync_session->register_progress_notifier(streaming_progress->make_cb(), NotifierType::upload, true);
4✔
1279

1280
    sync_session->resume();
4✔
1281
    wait_for_upload(*realm);
4✔
1282

1283
    auto streaming_entries = streaming_progress->wait_for_full_sync();
4✔
1284
    auto non_streaming_entries = non_streaming_progress->wait_for_full_sync();
4✔
1285

1286
    REQUIRE(!streaming_entries.empty());
4!
1287
    REQUIRE(!non_streaming_entries.empty());
4!
1288
    REQUIRE_THAT(non_streaming_entries, ProgressIncreasesMatcher{});
4✔
1289
    REQUIRE_THAT(streaming_entries, ProgressIncreasesMatcher{ProgressIncreasesMatcher::ByteCountOnly});
4✔
1290

1291
    setup.add_objects(realm, 5);
4✔
1292
    wait_for_upload(*realm);
4✔
1293

1294
    streaming_entries = streaming_progress->wait_for_full_sync();
4✔
1295
    REQUIRE_THAT(streaming_entries, ProgressIncreasesMatcher{ProgressIncreasesMatcher::ByteCountOnly});
4✔
1296
    REQUIRE(non_streaming_progress->empty());
4!
1297
}
4✔
1298

1299
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc