• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

couchbase / sync_gateway / 520

01 Aug 2025 01:44PM UTC coverage: 64.997% (+0.02%) from 64.975%
520

push

jenkins

web-flow
CBG-4734: fix for race when removing a value that is being loaded into rev cache (#7640)

32 of 43 new or added lines in 5 files covered. (74.42%)

11 existing lines in 4 files now uncovered.

40054 of 61624 relevant lines covered (65.0%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.58
/db/active_replicator_checkpointer.go
1
/*
2
Copyright 2020-Present Couchbase, Inc.
3

4
Use of this software is governed by the Business Source License included in
5
the file licenses/BSL-Couchbase.txt.  As of the Change Date specified in that
6
file, in accordance with the Business Source License, use of this software will
7
be governed by the Apache License, Version 2.0, included in the file
8
licenses/APL2.txt.
9
*/
10

11
package db
12

13
import (
14
        "context"
15
        "errors"
16
        "sort"
17
        "strings"
18
        "sync"
19
        "time"
20

21
        "github.com/couchbase/go-blip"
22
        "github.com/couchbase/sync_gateway/base"
23
)
24

25
const defaultExpectedSeqCompactionThreshold = 100
26

27
// Checkpointer implements replicator checkpointing, by keeping two lists of sequences. Those which we expect to be processing revs for (either push or pull), and a map for those which we have done so on.
28
// Periodically (based on a time interval), these two lists are used to calculate the highest sequence number which we've not had a gap for yet, and send a SetCheckpoint message for this sequence.
29
type Checkpointer struct {
30
        clientID            string
31
        configHash          string
32
        blipSender          *blip.Sender
33
        metadataStore       base.DataStore // metadataStore is where non-collection specific metadata is stored (e.g. replication status documents)
34
        collectionDataStore base.DataStore // collectionDataStore is where the checkpoints are stored for the collection
35
        localDocExpirySecs  int
36
        checkpointInterval  time.Duration
37
        // lock guards the expectedSeqs slice, and processedSeqs map
38
        lock sync.Mutex
39
        // expectedSeqs is an ordered list of sequence IDs we expect to process revs for
40
        expectedSeqs []SequenceID
41
        // processedSeqs is a map of sequence IDs we've processed revs for
42
        processedSeqs map[SequenceID]struct{}
43
        // idAndRevLookup is a temporary map of DocID/RevID pair to sequence number,
44
        // to handle cases where we receive a message that doesn't contain a sequence.
45
        idAndRevLookup map[IDAndRev]SequenceID
46
        // ctx is used to stop the checkpointer goroutine
47
        ctx context.Context
48
        // lastRemoteCheckpointRevID is the last known remote checkpoint RevID.
49
        lastRemoteCheckpointRevID string
50
        // lastLocalCheckpointRevID is the last known local checkpoint RevID.
51
        lastLocalCheckpointRevID string
52
        // lastCheckpointSeq is the last checkpointed sequence
53
        lastCheckpointSeq SequenceID
54
        // collectionIdx is the GetCollections index of the collection we're checkpointing for
55
        collectionIdx *int
56

57
        // expectedSeqCompactionThreshold is the number of expected sequences that we'll tolerate before considering compacting away already processed sequences
58
        // time vs. space complexity tradeoff, since we need to iterate over the expectedSeqs slice to compact it
59
        expectedSeqCompactionThreshold int
60

61
        stats CheckpointerStats
62

63
        // closeWg waits for the time-based checkpointer goroutine to finish.
64
        closeWg sync.WaitGroup
65
}
66

67
type CheckpointerStats struct {
68
        ExpectedSequenceCount           int64
69
        ExpectedSequenceLen             *base.SgwIntStat
70
        ExpectedSequenceLenPostCleanup  *base.SgwIntStat
71
        ProcessedSequenceCount          int64
72
        ProcessedSequenceLen            *base.SgwIntStat
73
        ProcessedSequenceLenPostCleanup *base.SgwIntStat
74
        AlreadyKnownSequenceCount       int64
75
        SetCheckpointCount              int64
76
        GetCheckpointHitCount           int64
77
        GetCheckpointMissCount          int64
78
}
79

80
func NewCheckpointer(ctx context.Context, metadataStore, collectionDataStore base.DataStore, clientID string, configHash string, blipSender *blip.Sender, replicatorConfig *ActiveReplicatorConfig, collectionIdx *int) *Checkpointer {
1✔
81
        return &Checkpointer{
1✔
82
                clientID:            clientID,
1✔
83
                configHash:          configHash,
1✔
84
                blipSender:          blipSender,
1✔
85
                metadataStore:       metadataStore,
1✔
86
                collectionDataStore: collectionDataStore,
1✔
87
                localDocExpirySecs:  int(replicatorConfig.ActiveDB.Options.LocalDocExpirySecs),
1✔
88
                expectedSeqs:        make([]SequenceID, 0),
1✔
89
                processedSeqs:       make(map[SequenceID]struct{}),
1✔
90
                idAndRevLookup:      make(map[IDAndRev]SequenceID),
1✔
91
                checkpointInterval:  replicatorConfig.CheckpointInterval,
1✔
92
                ctx:                 ctx,
1✔
93
                stats: CheckpointerStats{
1✔
94
                        ProcessedSequenceLen:            replicatorConfig.ReplicationStatsMap.ProcessedSequenceLen,
1✔
95
                        ProcessedSequenceLenPostCleanup: replicatorConfig.ReplicationStatsMap.ProcessedSequenceLenPostCleanup,
1✔
96
                        ExpectedSequenceLen:             replicatorConfig.ReplicationStatsMap.ExpectedSequenceLen,
1✔
97
                        ExpectedSequenceLenPostCleanup:  replicatorConfig.ReplicationStatsMap.ExpectedSequenceLenPostCleanup,
1✔
98
                },
1✔
99
                expectedSeqCompactionThreshold: defaultExpectedSeqCompactionThreshold,
1✔
100
                collectionIdx:                  collectionIdx,
1✔
101
        }
1✔
102
}
1✔
103

104
func (c *Checkpointer) AddAlreadyKnownSeq(seq ...SequenceID) {
1✔
105
        select {
1✔
106
        case <-c.ctx.Done():
1✔
107
                base.TracefCtx(c.ctx, base.KeyReplicate, "Inside AddAlreadyKnownSeq and context has been cancelled")
1✔
108
                // replicator already closed, bail out of checkpointing work
1✔
109
                return
1✔
110
        default:
1✔
111
        }
112

113
        c.lock.Lock()
1✔
114
        c.expectedSeqs = append(c.expectedSeqs, seq...)
1✔
115
        for _, seq := range seq {
2✔
116
                c.processedSeqs[seq] = struct{}{}
1✔
117
        }
1✔
118
        c.stats.AlreadyKnownSequenceCount += int64(len(seq))
1✔
119
        c.lock.Unlock()
1✔
120
}
121

122
func (c *Checkpointer) AddProcessedSeq(seq SequenceID) {
1✔
123
        select {
1✔
124
        case <-c.ctx.Done():
×
125
                base.TracefCtx(c.ctx, base.KeyReplicate, "Inside AddProcessedSeq and context has been cancelled")
×
126
                // replicator already closed, bail out of checkpointing work
×
127
                return
×
128
        default:
1✔
129
        }
130

131
        c.lock.Lock()
1✔
132
        c.processedSeqs[seq] = struct{}{}
1✔
133
        c.stats.ProcessedSequenceCount++
1✔
134
        c.lock.Unlock()
1✔
135
}
136

137
func (c *Checkpointer) AddProcessedSeqIDAndRev(seq *SequenceID, idAndRev IDAndRev) {
1✔
138
        select {
1✔
139
        case <-c.ctx.Done():
1✔
140
                base.TracefCtx(c.ctx, base.KeyReplicate, "Inside AddProcessedSeqIDAndRev and context has been cancelled")
1✔
141
                // replicator already closed, bail out of checkpointing work
1✔
142
                return
1✔
143
        default:
1✔
144
        }
145

146
        c.lock.Lock()
1✔
147

1✔
148
        if seq == nil {
1✔
149
                foundSeq, ok := c.idAndRevLookup[idAndRev]
×
150
                if !ok {
×
151
                        base.WarnfCtx(c.ctx, "Unable to find matching sequence for %q / %q", base.UD(idAndRev.DocID), idAndRev.RevID)
×
152
                }
×
153
                seq = &foundSeq
×
154
        }
155
        // should remove entry in the map even if we have a seq available
156
        delete(c.idAndRevLookup, idAndRev)
1✔
157

1✔
158
        c.processedSeqs[*seq] = struct{}{}
1✔
159
        c.stats.ProcessedSequenceCount++
1✔
160

1✔
161
        c.lock.Unlock()
1✔
162
}
163

164
func (c *Checkpointer) AddExpectedSeqs(seqs ...SequenceID) {
1✔
165
        if len(seqs) == 0 {
1✔
166
                // nothing to do
×
167
                return
×
168
        }
×
169

170
        select {
1✔
171
        case <-c.ctx.Done():
×
172
                // replicator already closed, bail out of checkpointing work
×
173
                base.TracefCtx(c.ctx, base.KeyReplicate, "Inside AddExpectedSeqs and context has been cancelled")
×
174
                return
×
175
        default:
1✔
176
        }
177

178
        c.lock.Lock()
1✔
179
        c.expectedSeqs = append(c.expectedSeqs, seqs...)
1✔
180
        c.stats.ExpectedSequenceCount += int64(len(seqs))
1✔
181
        c.lock.Unlock()
1✔
182
}
183

184
func (c *Checkpointer) AddExpectedSeqIDAndRevs(seqs map[IDAndRev]SequenceID) {
1✔
185
        if len(seqs) == 0 {
2✔
186
                // nothing to do
1✔
187
                return
1✔
188
        }
1✔
189

190
        select {
1✔
UNCOV
191
        case <-c.ctx.Done():
×
UNCOV
192
                // replicator already closed, bail out of checkpointing work
×
UNCOV
193
                base.TracefCtx(c.ctx, base.KeyReplicate, "Inside AddExpectedSeqIDAndRevs and context has been cancelled")
×
UNCOV
194
                return
×
195
        default:
1✔
196
        }
197

198
        c.lock.Lock()
1✔
199
        for idAndRev, seq := range seqs {
2✔
200
                c.idAndRevLookup[idAndRev] = seq
1✔
201
                c.expectedSeqs = append(c.expectedSeqs, seq)
1✔
202
        }
1✔
203
        c.stats.ExpectedSequenceCount += int64(len(seqs))
1✔
204
        c.lock.Unlock()
1✔
205
}
206

207
func (c *Checkpointer) Start() {
1✔
208
        // Start a time-based checkpointer goroutine
1✔
209
        if c.checkpointInterval > 0 {
2✔
210
                c.closeWg.Add(1)
1✔
211
                go func() {
2✔
212
                        defer c.closeWg.Done()
1✔
213
                        ticker := time.NewTicker(c.checkpointInterval)
1✔
214
                        defer ticker.Stop()
1✔
215
                        for {
2✔
216
                                select {
1✔
217
                                case <-ticker.C:
1✔
218
                                        base.TracefCtx(c.ctx, base.KeyReplicate, "calling checkpoint now. context is not cancelled here")
1✔
219
                                        c.CheckpointNow()
1✔
220
                                case <-c.ctx.Done():
1✔
221
                                        base.DebugfCtx(c.ctx, base.KeyReplicate, "checkpointer goroutine stopped")
1✔
222
                                        return
1✔
223
                                }
224
                        }
225
                }()
226
        }
227
}
228

229
// CheckpointNow forces the checkpointer to send a checkpoint, and blocks until it has finished.
230
func (c *Checkpointer) CheckpointNow() {
1✔
231
        if c == nil {
1✔
232
                return
×
233
        }
×
234

235
        c.lock.Lock()
1✔
236
        defer c.lock.Unlock()
1✔
237

1✔
238
        base.TracefCtx(c.ctx, base.KeyReplicate, "checkpointer: running")
1✔
239

1✔
240
        seq := c._updateCheckpointLists()
1✔
241
        if seq == nil {
2✔
242
                return
1✔
243
        }
1✔
244

245
        base.InfofCtx(c.ctx, base.KeyReplicate, "checkpointer: calculated seq: %v", seq)
1✔
246
        err := c._setCheckpoints(seq)
1✔
247
        if err != nil {
2✔
248
                base.WarnfCtx(c.ctx, "couldn't set checkpoints: %v", err)
1✔
249
        }
1✔
250
}
251

252
// Stats returns a copy of the checkpointer stats. Intended for test use - non-test usage may have
253
// performance implications associated with locking
254
func (c *Checkpointer) Stats() CheckpointerStats {
1✔
255
        c.lock.Lock()
1✔
256
        defer c.lock.Unlock()
1✔
257
        return c.stats
1✔
258
}
1✔
259

260
// _updateCheckpointLists determines the highest checkpointable sequence, and trims the processedSeqs/expectedSeqs lists up to this point.
261
// We will also remove all but the last processed sequence as we know we're able to checkpoint safely up to that point without leaving any intermediate sequence numbers around.
262
func (c *Checkpointer) _updateCheckpointLists() (safeSeq *SequenceID) {
1✔
263
        base.TracefCtx(c.ctx, base.KeyReplicate, "checkpointer: _updateCheckpointLists(expectedSeqs: %v, processedSeqs: %v)", c.expectedSeqs, c.processedSeqs)
1✔
264
        base.TracefCtx(c.ctx, base.KeyReplicate, "Inside update checkpoint lists")
1✔
265

1✔
266
        c.stats.ProcessedSequenceLen.Set(int64(len(c.processedSeqs)))
1✔
267
        c.stats.ExpectedSequenceLen.Set(int64(len(c.expectedSeqs)))
1✔
268
        maxI := c._calculateSafeExpectedSeqsIdx()
1✔
269

1✔
270
        if maxI > -1 {
2✔
271
                seq := c.expectedSeqs[maxI]
1✔
272

1✔
273
                // removes to-be checkpointed sequences from processedSeqs list
1✔
274
                for i := 0; i <= maxI; i++ {
2✔
275
                        removeSeq := c.expectedSeqs[i]
1✔
276
                        delete(c.processedSeqs, removeSeq)
1✔
277
                        base.TracefCtx(c.ctx, base.KeyReplicate, "checkpointer: _updateCheckpointLists removed seq %v from processedSeqs map", removeSeq)
1✔
278
                }
1✔
279

280
                // trim expectedSeqs list from beginning up to first unprocessed seq
281
                c.expectedSeqs = c.expectedSeqs[maxI+1:]
1✔
282
                safeSeq = &seq
1✔
283
        }
284

285
        // if we have many remaining expectedSeqs, see if we can shrink the lists even more
286
        // compact contiguous blocks of sequences by keeping only the last processed sequence in both lists
287
        if len(c.expectedSeqs) > c.expectedSeqCompactionThreshold {
2✔
288
                // start at the one before the end of the list (since we know we need to retain that one anyway, if it's processed)
1✔
289
                for i := len(c.expectedSeqs) - 2; i >= 0; i-- {
2✔
290
                        current := c.expectedSeqs[i]
1✔
291
                        next := c.expectedSeqs[i+1]
1✔
292
                        _, processedCurrent := c.processedSeqs[current]
1✔
293
                        _, processedNext := c.processedSeqs[next]
1✔
294
                        if processedCurrent && processedNext {
2✔
295
                                // remove the current sequence from both sets, since we know we've also processed the next sequence and are able to checkpoint that
1✔
296
                                delete(c.processedSeqs, current)
1✔
297
                                c.expectedSeqs = append(c.expectedSeqs[:i], c.expectedSeqs[i+1:]...)
1✔
298
                        }
1✔
299
                }
300
        }
301

302
        c.stats.ProcessedSequenceLenPostCleanup.Set(int64(len(c.processedSeqs)))
1✔
303
        c.stats.ExpectedSequenceLenPostCleanup.Set(int64(len(c.expectedSeqs)))
1✔
304
        return safeSeq
1✔
305
}
306

307
// _calculateSafeExpectedSeqsIdx returns an index into expectedSeqs which is safe to checkpoint.
308
// Returns -1 if no sequence in the list is able to be checkpointed.
309
func (c *Checkpointer) _calculateSafeExpectedSeqsIdx() int {
1✔
310
        safeIdx := -1
1✔
311

1✔
312
        sort.Slice(c.expectedSeqs, func(i, j int) bool {
2✔
313
                return c.expectedSeqs[i].Before(c.expectedSeqs[j])
1✔
314
        })
1✔
315

316
        // iterates over each (ordered) expected sequence and stops when we find the first sequence we've yet to process a rev message for
317
        for i, seq := range c.expectedSeqs {
2✔
318
                if _, ok := c.processedSeqs[seq]; !ok {
2✔
319
                        break
1✔
320
                }
321
                safeIdx = i
1✔
322
        }
323

324
        return safeIdx
1✔
325
}
326

327
// calculateSafeProcessedSeq returns the sequence last processed that is able to be checkpointed, or the last checkpointed sequence.
328
func (c *Checkpointer) calculateSafeProcessedSeq() SequenceID {
1✔
329
        c.lock.Lock()
1✔
330
        defer c.lock.Unlock()
1✔
331
        return c._calculateSafeProcessedSeq()
1✔
332
}
1✔
333

334
func (c *Checkpointer) _calculateSafeProcessedSeq() SequenceID {
1✔
335
        idx := c._calculateSafeExpectedSeqsIdx()
1✔
336
        if idx == -1 {
2✔
337
                return c.lastCheckpointSeq
1✔
338
        }
1✔
339

340
        return c.expectedSeqs[idx]
1✔
341
}
342

343
const (
344
        checkpointBodyRev     = "_rev"
345
        checkpointBodyLastSeq = "last_sequence"
346
        checkpointBodyHash    = "config_hash"
347
)
348

349
// replicationCheckpoint stores the config and latest sequence of the replication.
350
type replicationCheckpoint struct {
351
        Rev        string `json:"_rev"`
352
        ConfigHash string `json:"config_hash"`
353
        LastSeq    string `json:"last_sequence"`
354
}
355

356
// AsBody returns a Body representation of replicationCheckpoint for use with putSpecial
357
func (r *replicationCheckpoint) AsBody() Body {
1✔
358
        return Body{
1✔
359
                checkpointBodyRev:     r.Rev,
1✔
360
                checkpointBodyLastSeq: r.LastSeq,
1✔
361
                checkpointBodyHash:    r.ConfigHash,
1✔
362
        }
1✔
363
}
1✔
364

365
// NewReplicationCheckpoint converts a revID and checkpoint body into a replicationCheckpoint
366
func NewReplicationCheckpoint(revID string, body []byte) (checkpoint *replicationCheckpoint, err error) {
1✔
367

1✔
368
        err = base.JSONUnmarshal(body, &checkpoint)
1✔
369
        if err != nil {
1✔
370
                return nil, err
×
371
        }
×
372
        checkpoint.Rev = revID
1✔
373
        return checkpoint, nil
1✔
374
}
375

376
func (r *replicationCheckpoint) Copy() *replicationCheckpoint {
1✔
377
        return &replicationCheckpoint{
1✔
378
                Rev:        r.Rev,
1✔
379
                LastSeq:    r.LastSeq,
1✔
380
                ConfigHash: r.ConfigHash,
1✔
381
        }
1✔
382
}
1✔
383

384
// fetchDefaultCollectionCheckpoints gets remote checkpoint for the default collection and determines the lastCheckpointSeq.
385
func (c *Checkpointer) fetchDefaultCollectionCheckpoints() error {
1✔
386
        base.TracefCtx(c.ctx, base.KeyReplicate, "fetchDefaultCollectionCheckpoints()")
1✔
387
        remoteCheckpoint, err := c.getRemoteCheckpoint()
1✔
388
        if err != nil {
1✔
389
                return err
×
390
        }
×
391
        base.DebugfCtx(c.ctx, base.KeyReplicate, "got remote checkpoint: %v", remoteCheckpoint)
1✔
392
        return c.setLastCheckpointSeq(remoteCheckpoint)
1✔
393
}
394

395
// setLastCheckpointCheckpoints sets lastCheckpointSeq for the given Checkpointer by comparing local and remote checkpoints.
396
// Various scenarios this function handles:
397
// - Matching checkpoints from local and remote. Use that sequence.
398
// - Both SGR2 checkpoints are missing, we'll start the replication from zero.
399
// - Mismatched config hashes, use a zero value for sequence, so the replication can restart.
400
// - Mismatched sequences, we'll pick the lower of the two, and attempt to roll back the higher checkpoint to that point.
401
func (c *Checkpointer) setLastCheckpointSeq(remoteCheckpoint *replicationCheckpoint) error {
1✔
402
        base.TracefCtx(c.ctx, base.KeyReplicate, "fetchDefaultCollectionCheckpoints()")
1✔
403

1✔
404
        localCheckpoint, err := c.getLocalCheckpoint()
1✔
405
        if err != nil {
1✔
406
                return err
×
407
        }
×
408

409
        base.DebugfCtx(c.ctx, base.KeyReplicate, "got local checkpoint: %v", localCheckpoint)
1✔
410
        c.lastLocalCheckpointRevID = localCheckpoint.Rev
1✔
411

1✔
412
        c.lastRemoteCheckpointRevID = remoteCheckpoint.Rev
1✔
413

1✔
414
        localSeq := localCheckpoint.LastSeq
1✔
415
        remoteSeq := remoteCheckpoint.LastSeq
1✔
416

1✔
417
        // If localSeq and remoteSeq match, we'll use localSeq as the value.
1✔
418
        checkpointSeq := localSeq
1✔
419

1✔
420
        // Determine the lowest sequence if they don't match
1✔
421
        if localSeq != remoteSeq {
2✔
422
                base.DebugfCtx(c.ctx, base.KeyReplicate, "sequences mismatched, finding lowest of %q %q", localSeq, remoteSeq)
1✔
423
                localSeqVal, err := parseIntegerSequenceID(localSeq)
1✔
424
                if err != nil {
1✔
425
                        return err
×
426
                }
×
427

428
                remoteSeqVal, err := parseIntegerSequenceID(remoteSeq)
1✔
429
                if err != nil {
1✔
430
                        return err
×
431
                }
×
432

433
                // roll local/remote checkpoint back to lowest of the two
434
                if remoteSeqVal.Before(localSeqVal) {
2✔
435
                        checkpointSeq = remoteSeq
1✔
436
                        newLocalCheckpoint := remoteCheckpoint.Copy()
1✔
437
                        newLocalCheckpoint.Rev = c.lastLocalCheckpointRevID
1✔
438
                        c.lastLocalCheckpointRevID, err = c.setLocalCheckpoint(newLocalCheckpoint)
1✔
439
                        if err != nil {
1✔
440
                                base.WarnfCtx(c.ctx, "Unable to roll back local checkpoint: %v", err)
×
441
                        } else {
1✔
442
                                base.InfofCtx(c.ctx, base.KeyReplicate, "Rolled back local checkpoint to remote: %v", remoteSeqVal)
1✔
443
                        }
1✔
444
                } else {
1✔
445
                        // checkpointSeq already set above for localSeq value
1✔
446
                        newRemoteCheckpoint := localCheckpoint.Copy()
1✔
447
                        newRemoteCheckpoint.Rev = c.lastRemoteCheckpointRevID
1✔
448
                        c.lastRemoteCheckpointRevID, err = c.setRemoteCheckpoint(newRemoteCheckpoint)
1✔
449
                        if err != nil {
1✔
450
                                base.WarnfCtx(c.ctx, "Unable to roll back remote checkpoint: %v", err)
×
451
                        } else {
1✔
452
                                base.InfofCtx(c.ctx, base.KeyReplicate, "Rolled back remote checkpoint to local: %v", localSeqVal)
1✔
453
                        }
1✔
454
                }
455
        }
456

457
        // If checkpoint hash has changed, reset checkpoint to zero. Shouldn't need to persist the updated checkpoints in
458
        // the case both get rolled back to zero, can wait for next persistence.
459
        if localCheckpoint.ConfigHash != c.configHash || remoteCheckpoint.ConfigHash != c.configHash {
2✔
460
                base.DebugfCtx(c.ctx, base.KeyReplicate, "replicator config changed, unable to use previous checkpoint")
1✔
461
                checkpointSeq = ""
1✔
462
        }
1✔
463

464
        var parsedCheckpointSeq SequenceID
1✔
465
        if checkpointSeq != "" {
2✔
466
                parsedCheckpointSeq, err = ParsePlainSequenceID(checkpointSeq)
1✔
467
                if err == nil {
2✔
468
                        c.stats.GetCheckpointHitCount++
1✔
469
                } else {
1✔
470
                        base.WarnfCtx(c.ctx, "couldn't parse checkpoint sequence %q, unable to use previous checkpoint: %v", checkpointSeq, err)
×
471
                        c.stats.GetCheckpointMissCount++
×
472
                }
×
473
        } else {
1✔
474
                c.stats.GetCheckpointMissCount++
1✔
475
        }
1✔
476

477
        base.InfofCtx(c.ctx, base.KeyReplicate, "using checkpointed seq: %q", parsedCheckpointSeq.String())
1✔
478
        c.lastCheckpointSeq = parsedCheckpointSeq
1✔
479

1✔
480
        return nil
1✔
481
}
482

483
func (c *Checkpointer) _setCheckpoints(seq *SequenceID) (err error) {
1✔
484
        seqStr := seq.String()
1✔
485
        base.TracefCtx(c.ctx, base.KeyReplicate, "setCheckpoints(%v)", seqStr)
1✔
486
        c.lastLocalCheckpointRevID, err = c.setLocalCheckpointWithRetry(
1✔
487
                &replicationCheckpoint{
1✔
488
                        LastSeq:    seqStr,
1✔
489
                        Rev:        c.lastLocalCheckpointRevID,
1✔
490
                        ConfigHash: c.configHash,
1✔
491
                })
1✔
492
        if err != nil {
2✔
493
                return err
1✔
494
        }
1✔
495

496
        c.lastRemoteCheckpointRevID, err = c.setRemoteCheckpointWithRetry(
1✔
497
                &replicationCheckpoint{
1✔
498
                        LastSeq:    seqStr,
1✔
499
                        Rev:        c.lastRemoteCheckpointRevID,
1✔
500
                        ConfigHash: c.configHash,
1✔
501
                })
1✔
502
        if err != nil {
1✔
503
                return err
×
504
        }
×
505

506
        c.lastCheckpointSeq = *seq
1✔
507
        c.stats.SetCheckpointCount++
1✔
508

1✔
509
        return nil
1✔
510
}
511

512
// getLocalCheckpoint returns the sequence and rev for the local checkpoint.
513
// if the checkpoint does not exist, returns empty sequence and rev.
514
func (c *Checkpointer) getLocalCheckpoint() (checkpoint *replicationCheckpoint, err error) {
1✔
515
        base.TracefCtx(c.ctx, base.KeyReplicate, "getLocalCheckpoint")
1✔
516

1✔
517
        checkpointBytes, err := getSpecialBytes(c.collectionDataStore, DocTypeLocal, CheckpointDocIDPrefix+c.clientID, c.localDocExpirySecs)
1✔
518
        if err != nil {
2✔
519
                if !base.IsDocNotFoundError(err) {
1✔
520
                        return &replicationCheckpoint{}, err
×
521
                }
×
522
                base.DebugfCtx(c.ctx, base.KeyReplicate, "couldn't find existing local checkpoint for client %q", c.clientID)
1✔
523
                return &replicationCheckpoint{}, nil
1✔
524
        }
525

526
        err = base.JSONUnmarshal(checkpointBytes, &checkpoint)
1✔
527
        return checkpoint, err
1✔
528
}
529

530
func (c *Checkpointer) setLocalCheckpoint(checkpoint *replicationCheckpoint) (newRev string, err error) {
1✔
531
        newRev, _, err = putSpecial(c.collectionDataStore, DocTypeLocal, CheckpointDocIDPrefix+c.clientID, checkpoint.Rev, checkpoint.AsBody(), c.localDocExpirySecs)
1✔
532
        if err != nil {
2✔
533
                base.TracefCtx(c.ctx, base.KeyReplicate, "Error setting local checkpoint(%v): %v", checkpoint, err)
1✔
534
                return "", err
1✔
535
        }
1✔
536
        base.TracefCtx(c.ctx, base.KeyReplicate, "setLocalCheckpoint(%v)", checkpoint)
1✔
537
        return newRev, nil
1✔
538
}
539

540
// setLocalCheckpointWithRetry attempts to rewrite the checkpoint if the rev ID is mismatched, or the checkpoint has since been deleted.
541
func (c *Checkpointer) setLocalCheckpointWithRetry(checkpoint *replicationCheckpoint) (newRevID string, err error) {
1✔
542
        return c.setRetry(checkpoint,
1✔
543
                c.setLocalCheckpoint,
1✔
544
                c.getLocalCheckpoint,
1✔
545
        )
1✔
546
}
1✔
547

548
// resetLocalCheckpoint removes the local checkpoint to roll back the replication.
549
func resetLocalCheckpoint(dataStore base.DataStore, checkpointID string) error {
1✔
550
        key := RealSpecialDocID(DocTypeLocal, CheckpointDocIDPrefix+checkpointID)
1✔
551
        if err := dataStore.Delete(key); err != nil && !base.IsDocNotFoundError(err) {
1✔
552
                return err
×
553
        }
×
554
        return nil
1✔
555
}
556

557
// getRemoteCheckpoint returns the sequence and rev for the remote checkpoint.
558
// if the checkpoint does not exist, returns empty sequence and rev.
559
func (c *Checkpointer) getRemoteCheckpoint() (checkpoint *replicationCheckpoint, err error) {
1✔
560
        base.TracefCtx(c.ctx, base.KeyReplicate, "getRemoteCheckpoint")
1✔
561

1✔
562
        rq := GetSGR2CheckpointRequest{
1✔
563
                Client:        c.clientID,
1✔
564
                CollectionIdx: c.collectionIdx,
1✔
565
        }
1✔
566

1✔
567
        if err := rq.Send(c.ctx, c.blipSender); err != nil {
1✔
568
                return &replicationCheckpoint{}, err
×
569
        }
×
570

571
        resp, err := rq.Response()
1✔
572
        if err != nil {
1✔
573
                return &replicationCheckpoint{}, err
×
574
        }
×
575

576
        if resp == nil {
2✔
577
                base.DebugfCtx(c.ctx, base.KeyReplicate, "couldn't find existing remote checkpoint for client %q", c.clientID)
1✔
578
                return &replicationCheckpoint{}, nil
1✔
579
        }
1✔
580

581
        return NewReplicationCheckpoint(resp.RevID, resp.BodyBytes)
1✔
582
}
583

584
func (c *Checkpointer) setRemoteCheckpoint(checkpoint *replicationCheckpoint) (newRev string, err error) {
1✔
585

1✔
586
        base.TracefCtx(c.ctx, base.KeyReplicate, "setRemoteCheckpoint(%v)", checkpoint)
1✔
587

1✔
588
        checkpointBody := checkpoint.AsBody()
1✔
589
        rq := SetSGR2CheckpointRequest{
1✔
590
                Client:        c.clientID,
1✔
591
                Checkpoint:    checkpoint.AsBody(),
1✔
592
                CollectionIdx: c.collectionIdx,
1✔
593
        }
1✔
594

1✔
595
        parentRev, ok := checkpointBody[BodyRev].(string)
1✔
596
        if ok {
2✔
597
                rq.RevID = &parentRev
1✔
598
        }
1✔
599

600
        if err := rq.Send(c.ctx, c.blipSender); err != nil {
1✔
601
                return "", err
×
602
        }
×
603

604
        resp, err := rq.Response()
1✔
605
        if err != nil {
2✔
606
                return "", err
1✔
607
        }
1✔
608

609
        return resp.RevID, nil
1✔
610
}
611

612
// setRemoteCheckpointWithRetry attempts to rewrite the checkpoint if the rev ID is mismatched, or the checkpoint has since been deleted.
613
func (c *Checkpointer) setRemoteCheckpointWithRetry(checkpoint *replicationCheckpoint) (newRevID string, err error) {
1✔
614
        return c.setRetry(checkpoint,
1✔
615
                c.setRemoteCheckpoint,
1✔
616
                c.getRemoteCheckpoint,
1✔
617
        )
1✔
618
}
1✔
619

620
func (c *Checkpointer) getCounts() (expectedCount, processedCount int) {
1✔
621
        c.lock.Lock()
1✔
622
        defer c.lock.Unlock()
1✔
623
        return len(c.expectedSeqs), len(c.processedSeqs)
1✔
624
}
1✔
625

626
// waitForExpectedSequences waits for the expectedSeqs set to drain to zero.
627
// Intended to be used once the replication has been stopped, to wait for
628
// in-flight mutations to complete.
629
// Triggers immediate checkpointing if expectedCount == processedCount.
630
// Waits up to 10s, polling every 100ms.
631
func (c *Checkpointer) waitForExpectedSequences() error {
1✔
632
        waitCount := 0
1✔
633
        for waitCount < 100 {
2✔
634
                expectedCount, processedCount := c.getCounts()
1✔
635
                base.TracefCtx(c.ctx, base.KeyReplicate, "Inside waitForExpectedSequences loop expected %d and processed %d", expectedCount, processedCount)
1✔
636
                if expectedCount == 0 {
2✔
637
                        return nil
1✔
638
                }
1✔
639
                if expectedCount == processedCount {
2✔
640
                        c.CheckpointNow()
1✔
641
                        // Doing an additional check here, instead of just 'continue',
1✔
642
                        // in case of bugs that result in expectedCount==processedCount, but the
1✔
643
                        // sets are not identical.  In that scenario, want to sleep before retrying
1✔
644
                        updatedExpectedCount, _ := c.getCounts()
1✔
645
                        base.TracefCtx(c.ctx, base.KeyReplicate, "Inside waitForExpectedSequences updated expected count %d", updatedExpectedCount)
1✔
646
                        if updatedExpectedCount == 0 {
2✔
647
                                return nil
1✔
648
                        }
1✔
649
                }
650
                time.Sleep(100 * time.Millisecond)
1✔
651
                waitCount++
1✔
652
        }
653
        return errors.New("checkpointer waitForExpectedSequences failed to complete after waiting 10s")
×
654
}
655

656
type setCheckpointFn func(checkpoint *replicationCheckpoint) (revID string, err error)
657
type getCheckpointFn func() (checkpoint *replicationCheckpoint, err error)
658

659
// setRetry is a retry loop for a setCheckpointFn, which will fetch a new RevID from a getCheckpointFn in the event of a write conflict.
660
func (c *Checkpointer) setRetry(checkpoint *replicationCheckpoint, setFn setCheckpointFn, getFn getCheckpointFn) (newRevID string, err error) {
1✔
661
        for numAttempts := 0; numAttempts < 10; numAttempts++ {
2✔
662
                newRevID, err = setFn(checkpoint)
1✔
663
                if err != nil {
2✔
664
                        if strings.HasPrefix(err.Error(), "409") {
2✔
665
                                existingCheckpoint, getErr := getFn()
1✔
666
                                if getErr == nil {
2✔
667
                                        base.InfofCtx(c.ctx, base.KeyReplicate, "Revision mismatch in setCheckpoint - updated from %q to %q based on existing checkpoint, will retry", checkpoint.Rev, existingCheckpoint.Rev)
1✔
668
                                        checkpoint.Rev = existingCheckpoint.Rev
1✔
669
                                } else {
1✔
670
                                        base.InfofCtx(c.ctx, base.KeyReplicate, "Revision mismatch in setCheckpoint, and unable to retrieve existing, will retry", getErr)
×
671
                                        // pause before falling through to retry, in case of temporary failure on getFn
×
672
                                        time.Sleep(time.Millisecond * 100)
×
673
                                }
×
674
                        } else if strings.HasPrefix(err.Error(), "404") {
1✔
675
                                base.WarnfCtx(c.ctx, "checkpoint did not exist for attempted update - removing last known rev ID: %v", err)
×
676
                                checkpoint.Rev = ""
×
677
                        } else {
1✔
678
                                base.WarnfCtx(c.ctx, "got unexpected error from setCheckpoint: %v", err)
1✔
679
                        }
1✔
680
                        continue
1✔
681
                }
682
                return newRevID, nil
1✔
683
        }
684
        return "", errors.New("failed to write checkpoint after 10 attempts")
1✔
685
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc