• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

couchbase / sync_gateway / 532

08 Aug 2025 02:48PM UTC coverage: 64.674% (+0.02%) from 64.65%
532

push

jenkins

web-flow
CBG-4735: Remove the _globalSync xattr with purge to avoid attachment metadata hanging around (#7667)

17 of 17 new or added lines in 2 files covered. (100.0%)

8 existing lines in 3 files now uncovered.

39913 of 61714 relevant lines covered (64.67%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.92
/db/sequence_allocator.go
1
//  Copyright 2012-Present Couchbase, Inc.
2
//
3
//  Use of this software is governed by the Business Source License included
4
//  in the file licenses/BSL-Couchbase.txt.  As of the Change Date specified
5
//  in that file, in accordance with the Business Source License, use of this
6
//  software will be governed by the Apache License, Version 2.0, included in
7
//  the file licenses/APL2.txt.
8

9
package db
10

11
import (
12
        "context"
13
        "encoding/binary"
14
        "fmt"
15
        "math"
16
        "sync"
17
        "time"
18

19
        "github.com/couchbase/sync_gateway/base"
20
)
21

22
const (
23
        // 10 minute expiry for unused sequence docs.
24
        UnusedSequenceTTL = 10 * 60
25

26
        // Maximum time to wait after a reserve before releasing sequences
27
        defaultReleaseSequenceWait = 1500 * time.Millisecond
28

29
        // Maximum batch size
30
        maxBatchSize = 10
31

32
        // Factor by which to grow the sequence batch size
33
        sequenceBatchMultiplier = 2
34

35
        // Idle batch size.  Initial batch size if SG is in an idle state (with respect to writes)
36
        idleBatchSize = 1
37

38
        // minimum number that _sync:seq will be corrected by in event of a rollback
39
        // Accounts for a maximum SG cluster size of 50 nodes, each allocating a full batch size of 10
40
        // if this value is too low and this correction has potential to allocate sequences that other nodes have already reserved a batch for
41
        syncSeqCorrectionValue = 500
42

43
        // Maximum number of sequences to release as part of nextSequenceGreaterThan
44
        MaxSequencesToRelease = 10000000
45
)
46

47
// MaxSequenceIncrFrequency is the maximum frequency we want to perform incr operations.
48
// Incr operations occurring more frequently that this value trigger an increase
49
// in batch size.  Defined as var to simplify test usage
50
var MaxSequenceIncrFrequency = 1000 * time.Millisecond
51

52
type sequenceAllocator struct {
53
        datastore               base.DataStore      // Bucket whose counter to use
54
        dbStats                 *base.DatabaseStats // For updating per-db sequence allocation stats
55
        mutex                   sync.Mutex          // Makes this object thread-safe
56
        last                    uint64              // The last sequence allocated by this allocator.
57
        max                     uint64              // The range from (last+1) to max represents previously reserved sequences available for use.
58
        terminator              chan struct{}       // Terminator for releaseUnusedSequences goroutine
59
        reserveNotify           chan struct{}       // Channel for reserve notifications
60
        sequenceBatchSize       uint64              // Current sequence allocation batch size
61
        lastSequenceReserveTime time.Time           // Time of most recent sequence reserve
62
        releaseSequenceWait     time.Duration       // Supports test customization
63
        metaKeys                *base.MetadataKeys  // Key generator for sequence and unused sequence documents
64
}
65

66
func newSequenceAllocator(ctx context.Context, datastore base.DataStore, dbStatsMap *base.DatabaseStats, metaKeys *base.MetadataKeys) (*sequenceAllocator, error) {
1✔
67
        if dbStatsMap == nil {
1✔
68
                return nil, fmt.Errorf("dbStatsMap parameter must be non-nil")
×
69
        }
×
70

71
        s := &sequenceAllocator{
1✔
72
                datastore: datastore,
1✔
73
                dbStats:   dbStatsMap,
1✔
74
                metaKeys:  metaKeys,
1✔
75
        }
1✔
76
        s.terminator = make(chan struct{})
1✔
77
        s.sequenceBatchSize = idleBatchSize
1✔
78
        s.releaseSequenceWait = defaultReleaseSequenceWait
1✔
79

1✔
80
        // The reserveNotify channel manages communication between the releaseSequenceMonitor goroutine and _reserveSequenceRange invocations.
1✔
81
        s.reserveNotify = make(chan struct{}, 1)
1✔
82
        _, err := s.lastSequence(ctx) // just reads latest sequence from bucket
1✔
83
        if err != nil {
1✔
84
                return nil, err
×
85
        }
×
86
        go func() {
2✔
87
                defer base.FatalPanicHandler()
1✔
88
                s.releaseSequenceMonitor(ctx)
1✔
89
        }()
1✔
90
        return s, err
1✔
91
}
92

93
func (s *sequenceAllocator) Stop(ctx context.Context) {
1✔
94

1✔
95
        // Trigger stop and release of unused sequences
1✔
96
        close(s.terminator)
1✔
97
        s.releaseUnusedSequences(ctx)
1✔
98
}
1✔
99

100
// Release sequence monitor runs in its own goroutine, and releases allocated sequences
101
// that aren't used within 'releaseSequenceTimeout'.
102
func (s *sequenceAllocator) releaseSequenceMonitor(ctx context.Context) {
1✔
103

1✔
104
        // Terminator is only checked while in idle state - ensures sequence allocation drains and
1✔
105
        // unused sequences are released before exiting.
1✔
106
        timer := time.NewTimer(math.MaxInt64)
1✔
107
        defer timer.Stop()
1✔
108

1✔
109
        for {
2✔
110
                select {
1✔
111
                case <-s.reserveNotify:
1✔
112
                        // On reserve, start the timer to release unused sequences. A new reserve resets the timer.
1✔
113
                        // On timeout, release sequences and return to idle state
1✔
114
                        _ = timer.Reset(s.releaseSequenceWait)
1✔
115
                case <-timer.C:
1✔
116
                        s.releaseUnusedSequences(ctx)
1✔
117
                case <-s.terminator:
1✔
118
                        s.releaseUnusedSequences(ctx)
1✔
119
                        return
1✔
120
                }
121
        }
122
}
123

124
// Releases any currently reserved, non-allocated sequences.
125
func (s *sequenceAllocator) releaseUnusedSequences(ctx context.Context) {
1✔
126
        s.mutex.Lock()
1✔
127
        if s.last == s.max {
2✔
128
                s.mutex.Unlock()
1✔
129
                return
1✔
130
        }
1✔
131
        if s.last < s.max {
2✔
132
                _, err := s.releaseSequenceRange(ctx, s.last+1, s.max)
1✔
133
                if err != nil {
1✔
134
                        base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d]. Falling back to skipped sequence handling.  Error:%v", s.last+1, s.max, err)
×
135
                }
×
136
        }
137
        // Reduce batch size for next incr by the unused amount
138
        unusedAmount := s.max - s.last
1✔
139

1✔
140
        // If no sequences from the last batch were used, assume system is idle
1✔
141
        // and drop back to the idle batch size.
1✔
142
        if unusedAmount >= s.sequenceBatchSize {
1✔
143
                s.sequenceBatchSize = idleBatchSize
×
144
        } else {
1✔
145
                // Some sequences were used - reduce batch size by the unused amount.
1✔
146
                s.sequenceBatchSize = s.sequenceBatchSize - unusedAmount
1✔
147
        }
1✔
148

149
        s.last = s.max
1✔
150
        s.mutex.Unlock()
1✔
151
}
152

153
// Retrieves the last allocated sequence.  If there hasn't been an allocation yet by this node,
154
// retrieves the value of the _sync:seq counter from the bucket by doing an incr(0)
155
func (s *sequenceAllocator) lastSequence(ctx context.Context) (uint64, error) {
1✔
156
        s.mutex.Lock()
1✔
157
        lastSeq := s.last
1✔
158
        s.mutex.Unlock()
1✔
159

1✔
160
        if lastSeq > 0 {
2✔
161
                return lastSeq, nil
1✔
162
        }
1✔
163
        s.dbStats.SequenceGetCount.Add(1)
1✔
164
        last, err := s.getSequence()
1✔
165
        if err != nil {
1✔
166
                return 0, fmt.Errorf("Couldn't get sequence from bucket: %w", err)
×
167
        }
×
168
        return last, err
1✔
169
}
170

171
// Returns the next available sequence.
172
// If previously reserved sequences are available (s.last < s.max), returns one
173
// and increments s.last.
174
// If no previously reserved sequences are available, reserves new batch.
175
func (s *sequenceAllocator) nextSequence(ctx context.Context) (sequence uint64, err error) {
1✔
176
        s.mutex.Lock()
1✔
177
        sequence, sequencesReserved, err := s._nextSequence(ctx)
1✔
178
        s.mutex.Unlock()
1✔
179
        if err != nil {
1✔
180
                return 0, err
×
181
        }
×
182
        // If sequences were reserved, send notification to the release sequence monitor, to start the clock for releasing these sequences.
183
        // Must be done after mutex is released.
184
        if sequencesReserved {
2✔
185
                s.reserveNotify <- struct{}{}
1✔
186
        }
1✔
187
        return sequence, nil
1✔
188
}
189

190
// _releaseCurrentBatch releases any unused sequences currently held by the allocator.
191
// Writes unused sequence document while holding s.lock, shouldn't be used by high-throughput operations.
192
func (s *sequenceAllocator) _releaseCurrentBatch(ctx context.Context) (numReleased uint64, err error) {
1✔
193
        if s.max > s.last {
2✔
194
                numReleased, err = s.releaseSequenceRange(ctx, s.last+1, s.max)
1✔
195
                if err != nil {
1✔
196
                        return 0, err
×
197
                }
×
198
                s.last = s.max
1✔
199
        }
200
        return numReleased, nil
1✔
201
}
202

203
// nextSequenceGreaterThan increments _sync:seq such that it's greater than existingSequence + s.sequenceBatchSize
204
// In the case where our local s.max < _sync:seq (another node has incremented _sync:seq), we may be releasing
205
// sequences greater than existingSequence, but we will only ever release sequences allocated by this node's incr operation
206
func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existingSequence uint64) (sequence uint64, releasedSequenceCount uint64, err error) {
1✔
207

1✔
208
        targetSequence := existingSequence + 1
1✔
209
        s.mutex.Lock()
1✔
210
        // If the target sequence is less than or equal to one we've already allocated, can assign the sequence in the standard way
1✔
211
        if targetSequence <= s.last {
2✔
212
                sequence, sequencesReserved, err := s._nextSequence(ctx)
1✔
213
                s.mutex.Unlock()
1✔
214
                if err != nil {
1✔
215
                        return 0, 0, err
×
216
                }
×
217
                if sequencesReserved {
1✔
218
                        s.reserveNotify <- struct{}{}
×
219
                }
×
220
                return sequence, 0, nil
1✔
221
        }
222

223
        // If the target sequence is in our existing batch (between s.last and s.max), we want to release all unused sequences in the batch earlier
224
        // than targetSequence, and then assign as targetSequence
225
        if targetSequence <= s.max {
2✔
226
                releaseFrom := s.last + 1
1✔
227
                s.last = targetSequence
1✔
228
                s.dbStats.LastSequenceAssignedValue.Set(int64(targetSequence))
1✔
229
                s.mutex.Unlock()
1✔
230
                if releaseFrom < targetSequence {
2✔
231
                        released, err := s.releaseSequenceRange(ctx, releaseFrom, targetSequence-1)
1✔
232
                        if err != nil {
1✔
233
                                base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] from existing batch. Will be handled by skipped sequence handling.  Error:%v", releaseFrom, targetSequence-1, err)
×
234
                        }
×
235
                        releasedSequenceCount += released
1✔
236
                }
237
                s.dbStats.SequenceAssignedCount.Add(1)
1✔
238
                return targetSequence, releasedSequenceCount, nil
1✔
239

240
        }
241

242
        // At this point we need to allocate a sequence that's larger than what's in our current batch, so we first need to release the current batch.
243
        var numReleasedBatch uint64
1✔
244
        numReleasedBatch, err = s._releaseCurrentBatch(ctx)
1✔
245
        if err != nil {
1✔
246
                base.InfofCtx(ctx, base.KeyCache, "Unable to release current batch during nextSequenceGreaterThan for existing sequence %d. Will be handled by skipped sequence handling. %v", existingSequence, err)
×
247
        }
×
248
        releasedSequenceCount += numReleasedBatch
1✔
249

1✔
250
        syncSeq, err := s.getSequence()
1✔
251
        if err != nil {
1✔
252
                base.WarnfCtx(ctx, "Unable to fetch current sequence during nextSequenceGreaterThan for existing sequence %d. Error:%v", existingSequence, err)
×
253
                s.mutex.Unlock()
×
254
                return 0, 0, err
×
255
        }
×
256

257
        // check for rollback of _sync:seq before continuing
258
        if syncSeq < s.last {
2✔
259
                // rollback of _sync:seq detected
1✔
260
                syncSeq, err = s._fixSyncSeqRollback(ctx, syncSeq, s.last)
1✔
261
                if err != nil {
1✔
262
                        s.mutex.Unlock()
×
263
                        return 0, 0, err
×
264
                }
×
265
        }
266

267
        // If the target sequence is less than the current syncSeq, allocate as normal using _nextSequence
268
        if syncSeq >= targetSequence {
2✔
269
                sequence, sequencesReserved, err := s._nextSequence(ctx)
1✔
270
                s.mutex.Unlock()
1✔
271
                if err != nil {
1✔
272
                        return 0, 0, err
×
273
                }
×
274
                if sequencesReserved {
2✔
275
                        s.reserveNotify <- struct{}{}
1✔
276
                }
1✔
277
                return sequence, releasedSequenceCount, nil
1✔
278
        }
279

280
        // If the target sequence is greater than the current _sync:seq, we want to:
281
        // (a) Reserve n sequences past _sync:seq, where n = existingSequence - syncSeq.  It's ok if the resulting sequence exceeds targetSequence (if other nodes have allocated sequences and
282
        //   updated _sync:seq since we last updated s.max.)
283
        // (b) Allocate a standard batch of sequences, and assign a sequence from that batch in the usual way.
284
        // (c) Release the reserved sequences from part (a)
285
        numberToRelease := existingSequence - syncSeq
1✔
286
        numberToAllocate := s.sequenceBatchSize
1✔
287
        incrVal := numberToRelease + numberToAllocate
1✔
288

1✔
289
        // if sequences to release are above the max allowed, return error to cancel update
1✔
290
        if numberToRelease > MaxSequencesToRelease {
2✔
291
                s.mutex.Unlock()
1✔
292
                s.dbStats.CorruptSequenceCount.Add(1) // increment corrupt sequence count
1✔
293
                return 0, 0, base.ErrMaxSequenceReleasedExceeded
1✔
294
        }
1✔
295

296
        allocatedToSeq, err := s._incrementSequence(incrVal)
1✔
297
        if err != nil {
1✔
298
                base.WarnfCtx(ctx, "Error from _incrementSequence in nextSequenceGreaterThan(%d): %v", existingSequence, err)
×
299
                s.mutex.Unlock()
×
300
                return 0, 0, err
×
301
        }
×
302

303
        s.max = allocatedToSeq
1✔
304
        s.last = allocatedToSeq - numberToAllocate + 1
1✔
305
        sequence = s.last
1✔
306
        s.dbStats.LastSequenceAssignedValue.Set(int64(sequence))
1✔
307
        s.mutex.Unlock()
1✔
308

1✔
309
        // Perform standard batch handling and stats updates
1✔
310
        s.lastSequenceReserveTime = time.Now()
1✔
311
        s.reserveNotify <- struct{}{}
1✔
312
        s.dbStats.SequenceAssignedCount.Add(1)
1✔
313

1✔
314
        // Release the newly allocated sequences that were used to catch up to existingSequence (d)
1✔
315
        if numberToRelease > 0 {
2✔
316
                releaseTo := allocatedToSeq - numberToAllocate
1✔
317
                releaseFrom := releaseTo - numberToRelease + 1 // +1, as releaseSequenceRange is inclusive
1✔
318
                released, err := s.releaseSequenceRange(ctx, releaseFrom, releaseTo)
1✔
319
                if err != nil {
1✔
320
                        base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] to reach target sequence. Will be handled by skipped sequence handling.  Error:%v", releaseFrom, releaseTo, err)
×
321
                }
×
322
                releasedSequenceCount += released
1✔
323
        }
324

325
        return sequence, releasedSequenceCount, err
1✔
326
}
327

328
// _nextSequence reserves if needed, and then returns the next sequence
329
func (s *sequenceAllocator) _nextSequence(ctx context.Context) (sequence uint64, sequencesReserved bool, err error) {
1✔
330
        if s.last >= s.max {
2✔
331
                if err := s._reserveSequenceBatch(ctx); err != nil {
1✔
332
                        return 0, false, err
×
333
                }
×
334
                sequencesReserved = true
1✔
335
        }
336
        s.last++
1✔
337
        sequence = s.last
1✔
338
        s.dbStats.SequenceAssignedCount.Add(1)
1✔
339
        s.dbStats.LastSequenceAssignedValue.Set(int64(sequence))
1✔
340
        return sequence, sequencesReserved, nil
1✔
341
}
342

343
// Reserve a new sequence range, based on batch size.  Called by nextSequence when the previously allocated sequences have all been used.
344
func (s *sequenceAllocator) _reserveSequenceBatch(ctx context.Context) error {
1✔
345

1✔
346
        // If the time elapsed since the last reserveSequenceRange invocation reserve is shorter than our target frequency,
1✔
347
        // this indicates we're making an incr call more frequently than we want to.  Triggers an increase in batch size to
1✔
348
        // reduce incr frequency.
1✔
349
        if time.Since(s.lastSequenceReserveTime) < MaxSequenceIncrFrequency {
2✔
350
                s.sequenceBatchSize = s.sequenceBatchSize * sequenceBatchMultiplier
1✔
351
                if s.sequenceBatchSize > maxBatchSize {
2✔
352
                        s.sequenceBatchSize = maxBatchSize
1✔
353
                }
1✔
354
                base.DebugfCtx(ctx, base.KeyCRUD, "Increased sequence batch to %d", s.sequenceBatchSize)
1✔
355
        }
356

357
        max, err := s._incrementSequence(s.sequenceBatchSize)
1✔
358
        if err != nil {
1✔
359
                base.WarnfCtx(ctx, "Error from incrementSequence in _reserveSequences(%d): %v", s.sequenceBatchSize, err)
×
360
                return err
×
361
        }
×
362

363
        // check for rollback of _sync:seq document
364
        minimumExpectedValue := s.max + s.sequenceBatchSize
1✔
365
        if max < minimumExpectedValue {
2✔
366
                // rollback of _sync:seq detected
1✔
367
                max, err = s._fixSyncSeqRollback(ctx, max, minimumExpectedValue)
1✔
368
                if err != nil {
1✔
369
                        return err
×
370
                }
×
371
        }
372

373
        // Update max and last used sequences.  Last is updated here to account for sequences allocated/used by other
374
        // Sync Gateway nodes
375
        s.max = max
1✔
376
        s.last = max - s.sequenceBatchSize
1✔
377
        s.lastSequenceReserveTime = time.Now()
1✔
378

1✔
379
        return nil
1✔
380
}
381

382
// Gets the _sync:seq document value.  Retry handling provided by bucket.Get.
383
func (s *sequenceAllocator) getSequence() (max uint64, err error) {
1✔
384
        return base.GetCounter(s.datastore, s.metaKeys.SyncSeqKey())
1✔
385
}
1✔
386

387
// Increments the _sync:seq document.  Retry handling provided by bucket.Incr.
388
// Expects sequenceAllocator.mutex to be held to ensure consistent LastSequenceReservedValue updates.
389
func (s *sequenceAllocator) _incrementSequence(numToReserve uint64) (uint64, error) {
1✔
390
        value, err := s.datastore.Incr(s.metaKeys.SyncSeqKey(), numToReserve, numToReserve, 0)
1✔
391
        if err != nil {
1✔
392
                return value, err
×
393
        }
×
394
        s.dbStats.SequenceIncrCount.Add(1)
1✔
395
        s.dbStats.SequenceReservedCount.Add(int64(numToReserve))
1✔
396
        s.dbStats.LastSequenceReservedValue.Set(int64(value))
1✔
397
        return value, nil
1✔
398
}
399

400
// ReleaseSequence writes an unused sequence document, used to notify sequence buffering that a sequence has been allocated and not used.
401
// Sequence is stored as the document body to avoid null doc issues.
402
func (s *sequenceAllocator) releaseSequence(ctx context.Context, sequence uint64) error {
1✔
403
        key := fmt.Sprintf("%s%d", s.metaKeys.UnusedSeqPrefix(), sequence)
1✔
404
        body := make([]byte, 8)
1✔
405
        binary.LittleEndian.PutUint64(body, sequence)
1✔
406
        _, err := s.datastore.AddRaw(key, UnusedSequenceTTL, body)
1✔
407
        if err != nil {
1✔
408
                return err
×
409
        }
×
410
        s.dbStats.SequenceReleasedCount.Add(1)
1✔
411
        base.DebugfCtx(ctx, base.KeyCRUD, "Released unused sequence #%d", sequence)
1✔
412
        return nil
1✔
413
}
414

415
// releaseSequenceRange writes a binary document with the key _sync:unusedSeqs:fromSeq:toSeq.
416
// fromSeq and toSeq are inclusive (i.e. both fromSeq and toSeq are unused).
417
// From and to seq are stored as the document contents to avoid null doc issues.
418
// Returns the number of sequences released.
419
func (s *sequenceAllocator) releaseSequenceRange(ctx context.Context, fromSequence, toSequence uint64) (uint64, error) {
1✔
420

1✔
421
        // Exit if there's nothing to release
1✔
422
        if toSequence == 0 || toSequence < fromSequence {
1✔
423
                return 0, nil
×
424
        }
×
425
        key := s.metaKeys.UnusedSeqRangeKey(fromSequence, toSequence)
1✔
426
        body := make([]byte, 16)
1✔
427
        binary.LittleEndian.PutUint64(body[:8], fromSequence)
1✔
428
        binary.LittleEndian.PutUint64(body[8:16], toSequence)
1✔
429
        _, err := s.datastore.AddRaw(key, UnusedSequenceTTL, body)
1✔
430
        if err != nil {
1✔
431
                return 0, err
×
432
        }
×
433
        count := toSequence - fromSequence + 1
1✔
434
        s.dbStats.SequenceReleasedCount.Add(int64(count))
1✔
435
        base.DebugfCtx(ctx, base.KeyCRUD, "Released unused sequences #%d-#%d", fromSequence, toSequence)
1✔
436
        return count, nil
1✔
437
}
438

439
// waitForReleasedSequences blocks for 'releaseSequenceWait' past the provided startTime.
440
// Used to guarantee assignment of allocated sequences on other nodes.
441
func (s *sequenceAllocator) waitForReleasedSequences(ctx context.Context, startTime time.Time) (waitedFor time.Duration) {
1✔
442

1✔
443
        requiredWait := s.releaseSequenceWait - time.Since(startTime)
1✔
444
        if requiredWait < 0 {
2✔
445
                return 0
1✔
446
        }
1✔
447
        base.InfofCtx(ctx, base.KeyCache, "Waiting %v for sequence allocation...", requiredWait)
1✔
448
        time.Sleep(requiredWait)
1✔
449
        return requiredWait
1✔
450
}
451

452
// _fixSyncSeqRollback will correct a rolled back _sync:seq document in the bucket
453
func (s *sequenceAllocator) _fixSyncSeqRollback(ctx context.Context, prevAllocTo, expectedValue uint64) (allocatedToSeq uint64, err error) {
1✔
454
        base.WarnfCtx(ctx, "rollback of _sync:seq document detected. Allocated to %d but expected value of at least %d", prevAllocTo, expectedValue)
1✔
455
        // find diff between current _sync:seq value and what we expected it to be + correction value
1✔
456
        correctionIncrValue := (expectedValue - prevAllocTo) + syncSeqCorrectionValue
1✔
457

1✔
458
        worker := func() (bool, error, interface{}) {
2✔
459
                // grab _sync:seq value + its current cas value
1✔
460
                var result uint64
1✔
461
                cas, err := s.datastore.Get(s.metaKeys.SyncSeqKey(), &result)
1✔
462
                if err != nil {
1✔
463
                        return false, err, 0
×
464
                }
×
465
                // set the value to _sync:seq current value + incr value if result above is below that value
466
                setVal := result + correctionIncrValue
1✔
467
                if result < setVal {
2✔
468
                        _, err = s.datastore.WriteCas(s.metaKeys.SyncSeqKey(), 0, cas, setVal, 0)
1✔
469
                        if base.IsCasMismatch(err) {
1✔
UNCOV
470
                                // retry on cas error
×
UNCOV
471
                                return true, err, nil
×
UNCOV
472
                        }
×
473
                        if err == nil {
2✔
474
                                // successfully corrected _sync:seq value above
1✔
475
                                base.DebugfCtx(ctx, base.KeyCRUD, "corrected _sync:seq document from value %d by the value of %d", prevAllocTo, setVal)
1✔
476
                                return false, nil, setVal
1✔
477
                        }
1✔
478
                }
479
                // if we get here we either had error above that is not a cas mismatch thus we need to exit with failure or the result
480
                // from the fetch of _sync:seq was larger than expected (_sync:seq may have been fixed by another node)
481
                return false, err, 0
×
482
        }
483

484
        retryErr, _ := base.RetryLoop(ctx, "Fix _sync:seq Value", worker, base.CreateDoublingSleeperFunc(1000, 5))
1✔
485
        if retryErr != nil {
1✔
486
                base.WarnfCtx(ctx, "error: %v in retry loop to correct _sync:seq value", retryErr)
×
487
                return 0, retryErr
×
488
        }
×
489

490
        base.DebugfCtx(ctx, base.KeyCRUD, "_sync:seq value successfully corrected, entering normal sequence batch processing for this node")
1✔
491
        // if _sync:seq has been fixed successfully just increment by batch size to get new unique batch for this node
1✔
492
        allocatedToSeq, err = s._incrementSequence(s.sequenceBatchSize)
1✔
493
        if err != nil {
1✔
494
                return 0, err
×
495
        }
×
496

497
        return allocatedToSeq, err
1✔
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc