• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

couchbase / sync_gateway / 433

05 Jun 2025 03:32PM UTC coverage: 64.334% (+0.03%) from 64.302%
433

push

jenkins

web-flow
Docs/API: Fix outdated 'keyspace_map' ISGR collections config reference (#7561)

36770 of 57155 relevant lines covered (64.33%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.26
/db/change_cache.go
1
/*
2
Copyright 2016-Present Couchbase, Inc.
3

4
Use of this software is governed by the Business Source License included in
5
the file licenses/BSL-Couchbase.txt.  As of the Change Date specified in that
6
file, in accordance with the Business Source License, use of this software will
7
be governed by the Apache License, Version 2.0, included in the file
8
licenses/APL2.txt.
9
*/
10

11
package db
12

13
import (
14
        "container/heap"
15
        "context"
16
        "errors"
17
        "fmt"
18
        "math"
19
        "strconv"
20
        "strings"
21
        "sync"
22
        "sync/atomic"
23
        "time"
24

25
        sgbucket "github.com/couchbase/sg-bucket"
26
        "github.com/couchbase/sync_gateway/base"
27
        "github.com/couchbase/sync_gateway/channels"
28
)
29

30
const (
31
        DefaultCachePendingSeqMaxNum  = 10000            // Max number of waiting sequences
32
        DefaultCachePendingSeqMaxWait = 5 * time.Second  // Max time we'll wait for a pending sequence before sending to missed queue
33
        DefaultSkippedSeqMaxWait      = 60 * time.Minute // Max time we'll wait for an entry in the missing before purging
34
        QueryTombstoneBatch           = 250              // Max number of tombstones checked per query during Compact
35
        unusedSeqKey                  = "_unusedSeqKey"  // Key used by ChangeWaiter to mark unused sequences
36
        unusedSeqCollectionID         = 0                // Collection ID used by ChangeWaiter to mark unused sequences
37
)
38

39
// Enable keeping a channel-log for the "*" channel (channel.UserStarChannel). The only time this channel is needed is if
40
// someone has access to "*" (e.g. admin-party) and tracks its changes feed.
41
var EnableStarChannelLog = true
42

43
// Manages a cache of the recent change history of all channels.
44
//
45
// Core responsibilities:
46
//
47
// - Receive DCP changes via callbacks
48
//   - Perform sequence buffering to ensure documents are received in sequence order
49
//   - Propagating DCP changes down to appropriate channel caches
50
type changeCache struct {
51
        db                 *DatabaseContext
52
        logCtx             context.Context                     // fix in sg-bucket to ProcessEvent
53
        logsDisabled       bool                                // If true, ignore incoming tap changes
54
        nextSequence       uint64                              // Next consecutive sequence number to add.  State variable for sequence buffering tracking.  Should use getNextSequence() rather than accessing directly.
55
        initialSequence    uint64                              // DB's current sequence at startup time.
56
        receivedSeqs       map[uint64]struct{}                 // Set of all sequences received
57
        pendingLogs        LogPriorityQueue                    // Out-of-sequence entries waiting to be cached
58
        notifyChange       func(context.Context, channels.Set) // Client callback that notifies of channel changes
59
        started            base.AtomicBool                     // Set by the Start method
60
        stopped            base.AtomicBool                     // Set by the Stop method
61
        skippedSeqs        *SkippedSequenceSlice               // Skipped sequences still pending on the DCP caching feed
62
        lock               sync.RWMutex                        // Coordinates access to struct fields
63
        options            CacheOptions                        // Cache config
64
        terminator         chan bool                           // Signal termination of background goroutines
65
        backgroundTasks    []BackgroundTask                    // List of background tasks.
66
        initTime           time.Time                           // Cache init time - used for latency calculations
67
        channelCache       ChannelCache                        // Underlying channel cache
68
        lastAddPendingTime int64                               // The most recent time _addPendingLogs was run, as epoch time
69
        internalStats      changeCacheStats                    // Running stats for the change cache.  Only applied to expvars on a call to changeCache.updateStats
70
        cfgEventCallback   base.CfgEventNotifyFunc             // Callback for Cfg updates received over the caching feed
71
        sgCfgPrefix        string                              // Prefix for SG Cfg doc keys
72
        metaKeys           *base.MetadataKeys                  // Metadata key formatter
73
}
74

75
type changeCacheStats struct {
76
        highSeqFeed   uint64
77
        pendingSeqLen int
78
        maxPending    int
79
}
80

81
func (c *changeCache) updateStats(ctx context.Context) {
1✔
82

1✔
83
        c.lock.RLock()
1✔
84
        defer c.lock.RUnlock()
1✔
85
        if c.db == nil {
2✔
86
                return
1✔
87
        }
1✔
88
        // grab skipped sequence stats
89
        skippedSequenceListStats := c.skippedSeqs.getStats()
1✔
90

1✔
91
        c.db.DbStats.Database().HighSeqFeed.SetIfMax(int64(c.internalStats.highSeqFeed))
1✔
92
        c.db.DbStats.Cache().PendingSeqLen.Set(int64(c.internalStats.pendingSeqLen))
1✔
93
        c.db.DbStats.CBLReplicationPull().MaxPending.SetIfMax(int64(c.internalStats.maxPending))
1✔
94
        c.db.DbStats.Cache().HighSeqStable.Set(int64(c._getMaxStableCached(ctx)))
1✔
95
        c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(skippedSequenceListStats.NumCurrentSkippedSequencesStat)
1✔
96
        c.db.DbStats.Cache().NumSkippedSeqs.Set(skippedSequenceListStats.NumCumulativeSkippedSequencesStat)
1✔
97
        c.db.DbStats.Cache().SkippedSeqLen.Set(skippedSequenceListStats.ListLengthStat)
1✔
98
        c.db.DbStats.Cache().SkippedSeqCap.Set(skippedSequenceListStats.ListCapacityStat)
1✔
99
}
100

101
type LogEntry = channels.LogEntry
102

103
type LogEntries []*LogEntry
104

105
// A priority-queue of LogEntries, kept ordered by increasing sequence #.
106
type LogPriorityQueue []*LogEntry
107

108
type CacheOptions struct {
109
        ChannelCacheOptions
110
        CachePendingSeqMaxWait time.Duration // Max wait for pending sequence before skipping
111
        CachePendingSeqMaxNum  int           // Max number of pending sequences before skipping
112
        CacheSkippedSeqMaxWait time.Duration // Max wait for skipped sequence before abandoning
113
}
114

115
func DefaultCacheOptions() CacheOptions {
1✔
116
        return CacheOptions{
1✔
117
                CachePendingSeqMaxWait: DefaultCachePendingSeqMaxWait,
1✔
118
                CachePendingSeqMaxNum:  DefaultCachePendingSeqMaxNum,
1✔
119
                CacheSkippedSeqMaxWait: DefaultSkippedSeqMaxWait,
1✔
120
                ChannelCacheOptions: ChannelCacheOptions{
1✔
121
                        ChannelCacheAge:             DefaultChannelCacheAge,
1✔
122
                        ChannelCacheMinLength:       DefaultChannelCacheMinLength,
1✔
123
                        ChannelCacheMaxLength:       DefaultChannelCacheMaxLength,
1✔
124
                        MaxNumChannels:              DefaultChannelCacheMaxNumber,
1✔
125
                        CompactHighWatermarkPercent: DefaultCompactHighWatermarkPercent,
1✔
126
                        CompactLowWatermarkPercent:  DefaultCompactLowWatermarkPercent,
1✔
127
                        ChannelQueryLimit:           DefaultQueryPaginationLimit,
1✔
128
                },
1✔
129
        }
1✔
130
}
1✔
131

132
// ////// HOUSEKEEPING:
133

134
// Initializes a new changeCache.
135
// lastSequence is the last known database sequence assigned.
136
// notifyChange is an optional function that will be called to notify of channel changes.
137
// After calling Init(), you must call .Start() to start using the cache, otherwise it will be in a locked state
138
// and callers will block on trying to obtain the lock.
139

140
func (c *changeCache) Init(ctx context.Context, dbContext *DatabaseContext, channelCache ChannelCache, notifyChange func(context.Context, channels.Set), options *CacheOptions, metaKeys *base.MetadataKeys) error {
1✔
141
        c.db = dbContext
1✔
142
        c.logCtx = ctx
1✔
143

1✔
144
        c.notifyChange = notifyChange
1✔
145
        c.receivedSeqs = make(map[uint64]struct{})
1✔
146
        c.terminator = make(chan bool)
1✔
147
        c.initTime = time.Now()
1✔
148
        c.skippedSeqs = NewSkippedSequenceSlice(DefaultClipCapacityHeadroom)
1✔
149
        c.lastAddPendingTime = time.Now().UnixNano()
1✔
150
        c.sgCfgPrefix = dbContext.MetadataKeys.SGCfgPrefix(c.db.Options.GroupID)
1✔
151
        c.metaKeys = metaKeys
1✔
152

1✔
153
        // init cache options
1✔
154
        if options != nil {
2✔
155
                c.options = *options
1✔
156
        } else {
2✔
157
                c.options = DefaultCacheOptions()
1✔
158
        }
1✔
159

160
        c.channelCache = channelCache
1✔
161

1✔
162
        base.InfofCtx(ctx, base.KeyCache, "Initializing changes cache for %s with options %+v", base.UD(c.db.Name), c.options)
1✔
163

1✔
164
        heap.Init(&c.pendingLogs)
1✔
165

1✔
166
        // background tasks that perform housekeeping duties on the cache
1✔
167
        bgt, err := NewBackgroundTask(ctx, "InsertPendingEntries", c.InsertPendingEntries, c.options.CachePendingSeqMaxWait/2, c.terminator)
1✔
168
        if err != nil {
1✔
169
                return err
×
170
        }
×
171
        c.backgroundTasks = append(c.backgroundTasks, bgt)
1✔
172

1✔
173
        bgt, err = NewBackgroundTask(ctx, "CleanSkippedSequenceQueue", c.CleanSkippedSequenceQueue, c.options.CacheSkippedSeqMaxWait/2, c.terminator)
1✔
174
        if err != nil {
1✔
175
                return err
×
176
        }
×
177
        c.backgroundTasks = append(c.backgroundTasks, bgt)
1✔
178

1✔
179
        // Lock the cache -- not usable until .Start() called.  This fixes the DCP startup race condition documented in SG #3558.
1✔
180
        c.lock.Lock()
1✔
181
        return nil
1✔
182
}
183

184
func (c *changeCache) Start(initialSequence uint64) error {
1✔
185

1✔
186
        // Unlock the cache after this function returns.
1✔
187
        defer c.lock.Unlock()
1✔
188

1✔
189
        // Set initial sequence for sequence buffering
1✔
190
        c._setInitialSequence(initialSequence)
1✔
191

1✔
192
        // Set initial sequence for cache (validFrom)
1✔
193
        c.channelCache.Init(initialSequence)
1✔
194

1✔
195
        if !c.started.CompareAndSwap(false, true) {
1✔
196
                return errors.New("changeCache already started")
×
197
        }
×
198

199
        return nil
1✔
200
}
201

202
// Stops the cache. Clears its state and tells the housekeeping task to stop.
203
func (c *changeCache) Stop(ctx context.Context) {
1✔
204

1✔
205
        if !c.started.IsTrue() {
2✔
206
                // changeCache never started - nothing to stop
1✔
207
                return
1✔
208
        }
1✔
209

210
        if !c.stopped.CompareAndSwap(false, true) {
2✔
211
                base.WarnfCtx(ctx, "changeCache was already stopped")
1✔
212
                return
1✔
213
        }
1✔
214

215
        // Signal to background goroutines that the changeCache has been stopped, so they can exit
216
        // their loop
217
        close(c.terminator)
1✔
218

1✔
219
        // Wait for changeCache background tasks to finish.
1✔
220
        waitForBGTCompletion(ctx, BGTCompletionMaxWait, c.backgroundTasks, c.db.Name)
1✔
221

1✔
222
        c.lock.Lock()
1✔
223
        c.logsDisabled = true
1✔
224
        c.lock.Unlock()
1✔
225
}
226

227
// Empty out all channel caches.
228
func (c *changeCache) Clear(ctx context.Context) error {
1✔
229
        c.lock.Lock()
1✔
230
        defer c.lock.Unlock()
1✔
231

1✔
232
        // Reset initialSequence so that any new channel caches have their validFrom set to the current last sequence
1✔
233
        // the point at which the change cache was initialized / re-initialized.
1✔
234
        // No need to touch c.nextSequence here, because we don't want to touch the sequence buffering state.
1✔
235
        var err error
1✔
236
        c.initialSequence, err = c.db.LastSequence(ctx)
1✔
237
        if err != nil {
1✔
238
                return err
×
239
        }
×
240

241
        c.pendingLogs = nil
1✔
242
        heap.Init(&c.pendingLogs)
1✔
243

1✔
244
        c.initTime = time.Now()
1✔
245

1✔
246
        c.channelCache.Clear()
1✔
247
        return nil
1✔
248
}
249

250
// If set to false, DocChanged() becomes a no-op.
251
func (c *changeCache) EnableChannelIndexing(enable bool) {
×
252
        c.lock.Lock()
×
253
        c.logsDisabled = !enable
×
254
        c.lock.Unlock()
×
255
}
×
256

257
// Triggers addPendingLogs if it hasn't been run in CachePendingSeqMaxWait.  Error returned to fulfil BackgroundTaskFunc signature.
258
func (c *changeCache) InsertPendingEntries(ctx context.Context) error {
1✔
259

1✔
260
        lastAddPendingLogsTime := atomic.LoadInt64(&c.lastAddPendingTime)
1✔
261
        if time.Since(time.Unix(0, lastAddPendingLogsTime)) < c.options.CachePendingSeqMaxWait {
2✔
262
                return nil
1✔
263
        }
1✔
264

265
        // Trigger _addPendingLogs to process any entries that have been pending too long:
266
        c.lock.Lock()
1✔
267
        changedChannels := c._addPendingLogs(ctx)
1✔
268
        if c.notifyChange != nil && len(changedChannels) > 0 {
2✔
269
                c.notifyChange(ctx, changedChannels)
1✔
270
        }
1✔
271
        c.lock.Unlock()
1✔
272

1✔
273
        return nil
1✔
274
}
275

276
// Cleanup function, invoked periodically.
277
// Removes skipped entries from skippedSeqs that have been waiting longer
278
// than CacheSkippedSeqMaxWait from the slice.
279
func (c *changeCache) CleanSkippedSequenceQueue(ctx context.Context) error {
1✔
280

1✔
281
        base.InfofCtx(ctx, base.KeyCache, "Starting CleanSkippedSequenceQueue for database %s", base.MD(c.db.Name))
1✔
282

1✔
283
        compactedSequences := c.skippedSeqs.SkippedSequenceCompact(ctx, int64(c.options.CacheSkippedSeqMaxWait.Seconds()))
1✔
284
        if compactedSequences == 0 {
2✔
285
                base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete.  No sequences to be compacted from skipped sequence list for database %s.", base.MD(c.db.Name))
1✔
286
                return nil
1✔
287
        }
1✔
288

289
        c.db.DbStats.Cache().AbandonedSeqs.Add(compactedSequences)
1✔
290

1✔
291
        base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete.  Cleaned %d sequences from skipped list for database %s.", compactedSequences, base.MD(c.db.Name))
1✔
292
        return nil
1✔
293
}
294

295
// ////// ADDING CHANGES:
296

297
// Note that DocChanged may be executed concurrently for multiple events (in the DCP case, DCP events
298
// originating from multiple vbuckets).  Only processEntry is locking - all other functionality needs to support
299
// concurrent processing.
300
func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
1✔
301
        ctx := c.logCtx
1✔
302
        docID := string(event.Key)
1✔
303
        docJSON := event.Value
1✔
304
        changedChannelsCombined := channels.Set{}
1✔
305

1✔
306
        timeReceived := channels.NewFeedTimestamp(&event.TimeReceived)
1✔
307
        // ** This method does not directly access any state of c, so it doesn't lock.
1✔
308
        // Is this a user/role doc for this database?
1✔
309
        if strings.HasPrefix(docID, c.metaKeys.UserKeyPrefix()) {
2✔
310
                c.processPrincipalDoc(ctx, docID, docJSON, true, timeReceived)
1✔
311
                return
1✔
312
        } else if strings.HasPrefix(docID, c.metaKeys.RoleKeyPrefix()) {
3✔
313
                c.processPrincipalDoc(ctx, docID, docJSON, false, timeReceived)
1✔
314
                return
1✔
315
        }
1✔
316

317
        // Is this an unused sequence notification?
318
        if strings.HasPrefix(docID, c.metaKeys.UnusedSeqPrefix()) {
2✔
319
                c.processUnusedSequence(ctx, docID, timeReceived)
1✔
320
                return
1✔
321
        }
1✔
322
        if strings.HasPrefix(docID, c.metaKeys.UnusedSeqRangePrefix()) {
2✔
323
                c.processUnusedSequenceRange(ctx, docID)
1✔
324
                return
1✔
325
        }
1✔
326

327
        if strings.HasPrefix(docID, c.sgCfgPrefix) {
1✔
328
                if c.cfgEventCallback != nil {
×
329
                        c.cfgEventCallback(docID, event.Cas, nil)
×
330
                }
×
331
                return
×
332
        }
333

334
        collection, exists := c.db.CollectionByID[event.CollectionID]
1✔
335
        if !exists {
2✔
336
                cID := event.CollectionID
1✔
337
                if cID == base.DefaultCollectionID && base.MetadataCollectionID == base.DefaultCollectionID {
2✔
338
                        // It's possible for the `_default` collection to be associated with other databases writing non-principal documents,
1✔
339
                        // but we still need this collection's feed for the sgCfgPrefix docs.
1✔
340
                } else if cID == base.MetadataCollectionID {
1✔
341
                        // When Metadata moves to a different collection, we should start to warn again - we don't expect non-metadata mutations here!
×
342
                        base.WarnfCtx(ctx, "DocChanged(): Non-metadata mutation for doc %q in MetadataStore - kv ID: %d", base.UD(docID), cID)
×
343
                } else {
×
344
                        // Unrecognised collection
×
345
                        // we shouldn't be receiving mutations for a collection we're not running a database for (except the metadata store)
×
346
                        base.WarnfCtx(ctx, "DocChanged(): Could not find collection for doc %q - kv ID: %d", base.UD(docID), cID)
×
347
                }
×
348
                return
1✔
349
        }
350

351
        ctx = collection.AddCollectionContext(ctx)
1✔
352

1✔
353
        // If this is a delete and there are no xattrs (no existing SG revision), we can ignore
1✔
354
        if event.Opcode == sgbucket.FeedOpDeletion && len(docJSON) == 0 {
2✔
355
                base.DebugfCtx(ctx, base.KeyCache, "Ignoring delete mutation for %s - no existing Sync Gateway metadata.", base.UD(docID))
1✔
356
                return
1✔
357
        }
1✔
358

359
        // If this is a binary document (and not one of the above types), we can ignore.  Currently only performing this check when xattrs
360
        // are enabled, because walrus doesn't support DataType on feed.
361
        if collection.UseXattrs() && event.DataType == base.MemcachedDataTypeRaw {
2✔
362
                return
1✔
363
        }
1✔
364

365
        // First unmarshal the doc (just its metadata, to save time/memory):
366
        syncData, rawBody, rawXattrs, err := UnmarshalDocumentSyncDataFromFeed(docJSON, event.DataType, collection.userXattrKey(), false)
1✔
367
        if err != nil {
2✔
368
                // Avoid log noise related to failed unmarshaling of binary documents.
1✔
369
                if event.DataType != base.MemcachedDataTypeRaw {
2✔
370
                        base.DebugfCtx(ctx, base.KeyCache, "Unable to unmarshal sync metadata for feed document %q.  Will not be included in channel cache.  Error: %v", base.UD(docID), err)
1✔
371
                }
1✔
372
                if errors.Is(err, sgbucket.ErrEmptyMetadata) {
1✔
373
                        base.WarnfCtx(ctx, "Unexpected empty metadata when processing feed event.  docid: %s opcode: %v datatype:%v", base.UD(event.Key), event.Opcode, event.DataType)
×
374
                }
×
375
                return
1✔
376
        }
377

378
        // If using xattrs and this isn't an SG write, we shouldn't attempt to cache.
379
        rawUserXattr := rawXattrs[collection.userXattrKey()]
1✔
380
        if collection.UseXattrs() {
2✔
381
                if syncData == nil {
2✔
382
                        return
1✔
383
                }
1✔
384
                isSGWrite, _, _ := syncData.IsSGWrite(event.Cas, rawBody, rawUserXattr)
1✔
385
                if !isSGWrite {
2✔
386
                        return
1✔
387
                }
1✔
388
        }
389

390
        // If not using xattrs and no sync metadata found, check whether we're mid-upgrade and attempting to read a doc w/ metadata stored in xattr
391
        // before ignoring the mutation.
392
        if !collection.UseXattrs() && !syncData.HasValidSyncData() {
1✔
393
                migratedDoc, _ := collection.checkForUpgrade(ctx, docID, DocUnmarshalNoHistory)
×
394
                if migratedDoc != nil && migratedDoc.Cas == event.Cas {
×
395
                        base.InfofCtx(ctx, base.KeyCache, "Found mobile xattr on doc %q without %s property - caching, assuming upgrade in progress.", base.UD(docID), base.SyncPropertyName)
×
396
                        syncData = &migratedDoc.SyncData
×
397
                } else {
×
398
                        base.InfofCtx(ctx, base.KeyCache, "changeCache: Doc %q does not have valid sync data.", base.UD(docID))
×
399
                        collection.dbStats().Cache().NonMobileIgnoredCount.Add(1)
×
400
                        return
×
401
                }
×
402
        }
403

404
        if syncData.Sequence <= c.initialSequence {
1✔
405
                return // DCP is sending us an old value from before I started up; ignore it
×
406
        }
×
407

408
        // Measure feed latency from timeSaved or the time we started working the feed, whichever is later
409
        var feedLatency time.Duration
1✔
410
        if !syncData.TimeSaved.IsZero() {
2✔
411
                if syncData.TimeSaved.After(c.initTime) {
2✔
412
                        feedLatency = time.Since(syncData.TimeSaved)
1✔
413
                } else {
2✔
414
                        feedLatency = time.Since(c.initTime)
1✔
415
                }
1✔
416
                // Record latency when greater than zero
417
                feedNano := feedLatency.Nanoseconds()
1✔
418
                if feedNano > 0 {
2✔
419
                        c.db.DbStats.Database().DCPReceivedTime.Add(feedNano)
1✔
420
                }
1✔
421
        }
422
        c.db.DbStats.Database().DCPReceivedCount.Add(1)
1✔
423

1✔
424
        // If the doc update wasted any sequences due to conflicts, add empty entries for them:
1✔
425
        for _, seq := range syncData.UnusedSequences {
2✔
426
                base.InfofCtx(ctx, base.KeyCache, "Received unused #%d in unused_sequences property for (%q / %q)", seq, base.UD(docID), syncData.CurrentRev)
1✔
427
                change := &LogEntry{
1✔
428
                        Sequence:       seq,
1✔
429
                        TimeReceived:   timeReceived,
1✔
430
                        CollectionID:   event.CollectionID,
1✔
431
                        UnusedSequence: true,
1✔
432
                }
1✔
433
                changedChannels := c.processEntry(ctx, change)
1✔
434
                changedChannelsCombined = changedChannelsCombined.Update(changedChannels)
1✔
435
        }
1✔
436

437
        // If the recent sequence history includes any sequences earlier than the current sequence, and
438
        // not already seen by the gateway (more recent than c.nextSequence), add them as empty entries
439
        // so that they are included in sequence buffering.
440
        // If one of these sequences represents a removal from a channel then set the LogEntry removed flag
441
        // and the set of channels it was removed from
442
        currentSequence := syncData.Sequence
1✔
443
        if len(syncData.UnusedSequences) > 0 {
2✔
444
                currentSequence = syncData.UnusedSequences[0]
1✔
445
        }
1✔
446

447
        if len(syncData.RecentSequences) > 0 {
2✔
448
                nextSequence := c.getNextSequence()
1✔
449

1✔
450
                for _, seq := range syncData.RecentSequences {
2✔
451
                        // seq < currentSequence means the sequence is not the latest allocated to this document
1✔
452
                        // seq >= nextSequence means this sequence is a pending sequence to be expected in the cache
1✔
453
                        // the two conditions above together means that the cache expects us to run processEntry on this sequence as its pending
1✔
454
                        // If seq < current sequence allocated to the doc and seq is in skipped list this means that this sequence
1✔
455
                        // never arrived over the caching feed due to deduplication and was pushed to a skipped sequence list
1✔
456
                        isSkipped := (seq < currentSequence && seq < nextSequence) && c.WasSkipped(seq)
1✔
457
                        if (seq >= nextSequence && seq < currentSequence) || isSkipped {
2✔
458
                                base.InfofCtx(ctx, base.KeyCache, "Received deduplicated #%d in recent_sequences property for (%q / %q)", seq, base.UD(docID), syncData.CurrentRev)
1✔
459
                                change := &LogEntry{
1✔
460
                                        Sequence:     seq,
1✔
461
                                        TimeReceived: timeReceived,
1✔
462
                                        CollectionID: event.CollectionID,
1✔
463
                                        Skipped:      isSkipped,
1✔
464
                                }
1✔
465

1✔
466
                                // if the doc was removed from one or more channels at this sequence
1✔
467
                                // Set the removed flag and removed channel set on the LogEntry
1✔
468
                                if channelRemovals, atRevId := syncData.Channels.ChannelsRemovedAtSequence(seq); len(channelRemovals) > 0 {
1✔
469
                                        change.DocID = docID
×
470
                                        change.RevID = atRevId
×
471
                                        change.Channels = channelRemovals
×
472
                                } else {
1✔
473
                                        change.UnusedSequence = true // treat as unused sequence when sequence is not channel removal
1✔
474
                                }
1✔
475

476
                                changedChannels := c.processEntry(ctx, change)
1✔
477
                                changedChannelsCombined = changedChannelsCombined.Update(changedChannels)
1✔
478
                        }
479
                }
480
        }
481

482
        // Now add the entry for the new doc revision:
483
        if len(rawUserXattr) > 0 {
1✔
484
                collection.revisionCache.Remove(ctx, docID, syncData.CurrentRev)
×
485
        }
×
486
        change := &LogEntry{
1✔
487
                Sequence:     syncData.Sequence,
1✔
488
                DocID:        docID,
1✔
489
                RevID:        syncData.CurrentRev,
1✔
490
                Flags:        syncData.Flags,
1✔
491
                TimeReceived: timeReceived,
1✔
492
                Channels:     syncData.Channels,
1✔
493
                CollectionID: event.CollectionID,
1✔
494
        }
1✔
495

1✔
496
        millisecondLatency := int(feedLatency / time.Millisecond)
1✔
497

1✔
498
        // If latency is larger than 1 minute or is negative there is likely an issue and this should be clear to the user
1✔
499
        if millisecondLatency >= 60*1000 {
1✔
500
                base.InfofCtx(ctx, base.KeyChanges, "Received #%d after %3dms (%q / %q)", change.Sequence, millisecondLatency, base.UD(change.DocID), change.RevID)
×
501
        } else {
1✔
502
                base.DebugfCtx(ctx, base.KeyChanges, "Received #%d after %3dms (%q / %q)", change.Sequence, millisecondLatency, base.UD(change.DocID), change.RevID)
1✔
503
        }
1✔
504

505
        changedChannels := c.processEntry(ctx, change)
1✔
506
        changedChannelsCombined = changedChannelsCombined.Update(changedChannels)
1✔
507

1✔
508
        // Notify change listeners for all of the changed channels
1✔
509
        if c.notifyChange != nil && len(changedChannelsCombined) > 0 {
2✔
510
                c.notifyChange(ctx, changedChannelsCombined)
1✔
511
        }
1✔
512

513
}
514

515
// Simplified principal limited to properties needed by caching
516
type cachePrincipal struct {
517
        Name     string `json:"name"`
518
        Sequence uint64 `json:"sequence"`
519
}
520

521
func (c *changeCache) Remove(ctx context.Context, collectionID uint32, docIDs []string, startTime time.Time) (count int) {
1✔
522
        return c.channelCache.Remove(ctx, collectionID, docIDs, startTime)
1✔
523
}
1✔
524

525
// Principals unmarshalled during caching don't need to instantiate a real principal - we're just using name and seq from the document
526
func (c *changeCache) unmarshalCachePrincipal(docJSON []byte) (cachePrincipal, error) {
1✔
527
        var principal cachePrincipal
1✔
528
        err := base.JSONUnmarshal(docJSON, &principal)
1✔
529
        return principal, err
1✔
530
}
1✔
531

532
// Process unused sequence notification.  Extracts sequence from docID and sends to cache for buffering
533
func (c *changeCache) processUnusedSequence(ctx context.Context, docID string, timeReceived channels.FeedTimestamp) {
1✔
534
        sequenceStr := strings.TrimPrefix(docID, c.metaKeys.UnusedSeqPrefix())
1✔
535
        sequence, err := strconv.ParseUint(sequenceStr, 10, 64)
1✔
536
        if err != nil {
1✔
537
                base.WarnfCtx(ctx, "Unable to identify sequence number for unused sequence notification with key: %s, error: %v", base.UD(docID), err)
×
538
                return
×
539
        }
×
540
        c.releaseUnusedSequence(ctx, sequence, timeReceived)
1✔
541

542
}
543

544
func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64, timeReceived channels.FeedTimestamp) {
1✔
545
        change := &LogEntry{
1✔
546
                Sequence:       sequence,
1✔
547
                TimeReceived:   timeReceived,
1✔
548
                UnusedSequence: true,
1✔
549
        }
1✔
550
        base.InfofCtx(ctx, base.KeyCache, "Received #%d (unused sequence)", sequence)
1✔
551

1✔
552
        // Since processEntry may unblock pending sequences, if there were any changed channels we need
1✔
553
        // to notify any change listeners that are working changes feeds for these channels
1✔
554
        changedChannels := c.processEntry(ctx, change)
1✔
555
        if changedChannels == nil {
2✔
556
                changedChannels = channels.SetOfNoValidate(unusedSeqChannelID)
1✔
557
        } else {
2✔
558
                changedChannels.Add(unusedSeqChannelID)
1✔
559
        }
1✔
560
        if c.notifyChange != nil && len(changedChannels) > 0 {
2✔
561
                c.notifyChange(ctx, changedChannels)
1✔
562
        }
1✔
563
}
564

565
// releaseUnusedSequenceRange will handle unused sequence range arriving over DCP. It will batch remove from skipped or
566
// push a range to pending sequences, or both.
567
func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequence uint64, toSequence uint64, timeReceived channels.FeedTimestamp) {
1✔
568

1✔
569
        base.InfofCtx(ctx, base.KeyCache, "Received #%d-#%d (unused sequence range)", fromSequence, toSequence)
1✔
570

1✔
571
        allChangedChannels := channels.SetOfNoValidate(unusedSeqChannelID)
1✔
572

1✔
573
        // if range is single value, just run sequence through process entry and return early
1✔
574
        if fromSequence == toSequence {
2✔
575
                change := &LogEntry{
1✔
576
                        Sequence:       toSequence,
1✔
577
                        TimeReceived:   timeReceived,
1✔
578
                        UnusedSequence: true,
1✔
579
                }
1✔
580
                changedChannels := c.processEntry(ctx, change)
1✔
581
                allChangedChannels = allChangedChannels.Update(changedChannels)
1✔
582
                if c.notifyChange != nil {
2✔
583
                        c.notifyChange(ctx, allChangedChannels)
1✔
584
                }
1✔
585
                return
1✔
586
        }
587

588
        // push unused range to either pending or skipped lists based on current state of the change cache
589
        allChangedChannels = c.processUnusedRange(ctx, fromSequence, toSequence, allChangedChannels, timeReceived)
1✔
590

1✔
591
        if c.notifyChange != nil {
2✔
592
                c.notifyChange(ctx, allChangedChannels)
1✔
593
        }
1✔
594
}
595

596
// processUnusedRange handles pushing unused range to pending or skipped lists
597
func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSequence uint64, allChangedChannels channels.Set, timeReceived channels.FeedTimestamp) channels.Set {
1✔
598
        c.lock.Lock()
1✔
599
        defer c.lock.Unlock()
1✔
600

1✔
601
        var numSkipped int64
1✔
602
        if toSequence < c.nextSequence {
2✔
603
                // batch remove from skipped
1✔
604
                numSkipped = c.skippedSeqs.processUnusedSequenceRangeAtSkipped(ctx, fromSequence, toSequence)
1✔
605
        } else if fromSequence >= c.nextSequence {
3✔
606
                // whole range to pending
1✔
607
                c._pushRangeToPending(fromSequence, toSequence, timeReceived)
1✔
608
                // unblock any pending sequences we can after new range(s) have been pushed to pending
1✔
609
                changedChannels := c._addPendingLogs(ctx)
1✔
610
                allChangedChannels = allChangedChannels.Update(changedChannels)
1✔
611
                c.internalStats.pendingSeqLen = len(c.pendingLogs)
1✔
612
        } else {
1✔
613
                // An unused sequence range than includes c.nextSequence in the middle of the range
×
614
                // isn't possible under normal processing - unused sequence ranges will normally be moved
×
615
                // from pending to skipped in their entirety, as it's the processing of the pending sequence
×
616
                // *after* the range that triggers the range to be skipped.  A partial range in skipped means
×
617
                // a duplicate entry with a sequence within the bounds of the range was previously present
×
618
                // in pending.
×
619
                base.WarnfCtx(ctx, "unused sequence range of #%d to %d contains duplicate sequences, will be ignored", fromSequence, toSequence)
×
620
        }
×
621
        if numSkipped == 0 {
2✔
622
                c.db.mutationListener.BroadcastSlowMode.CompareAndSwap(true, false)
1✔
623
        }
1✔
624
        return allChangedChannels
1✔
625
}
626

627
// _pushRangeToPending will push an unused sequence range to pendingLogs
628
func (c *changeCache) _pushRangeToPending(startSeq, endSeq uint64, timeReceived channels.FeedTimestamp) {
1✔
629

1✔
630
        entry := &LogEntry{
1✔
631
                TimeReceived:   timeReceived,
1✔
632
                Sequence:       startSeq,
1✔
633
                EndSequence:    endSeq,
1✔
634
                UnusedSequence: true,
1✔
635
        }
1✔
636
        heap.Push(&c.pendingLogs, entry)
1✔
637

1✔
638
}
1✔
639

640
// Process unused sequence notification.  Extracts sequence from docID and sends to cache for buffering
641
func (c *changeCache) processUnusedSequenceRange(ctx context.Context, docID string) {
1✔
642
        // _sync:unusedSequences:fromSeq:toSeq
1✔
643
        sequencesStr := strings.TrimPrefix(docID, c.metaKeys.UnusedSeqRangePrefix())
1✔
644
        sequences := strings.Split(sequencesStr, ":")
1✔
645
        if len(sequences) != 2 {
1✔
646
                return
×
647
        }
×
648

649
        fromSequence, err := strconv.ParseUint(sequences[0], 10, 64)
1✔
650
        if err != nil {
1✔
651
                base.WarnfCtx(ctx, "Unable to identify from sequence number for unused sequences notification with key: %s, error:", base.UD(docID), err)
×
652
                return
×
653
        }
×
654
        toSequence, err := strconv.ParseUint(sequences[1], 10, 64)
1✔
655
        if err != nil {
1✔
656
                base.WarnfCtx(ctx, "Unable to identify to sequence number for unused sequence notification with key: %s, error:", base.UD(docID), err)
×
657
                return
×
658
        }
×
659

660
        c.releaseUnusedSequenceRange(ctx, fromSequence, toSequence, channels.NewFeedTimestampFromNow())
1✔
661
}
662

663
func (c *changeCache) processPrincipalDoc(ctx context.Context, docID string, docJSON []byte, isUser bool, timeReceived channels.FeedTimestamp) {
1✔
664

1✔
665
        // Currently the cache isn't really doing much with user docs; mostly it needs to know about
1✔
666
        // them because they have sequence numbers, so without them the sequence of sequences would
1✔
667
        // have gaps in it, causing later sequences to get stuck in the queue.
1✔
668
        princ, err := c.unmarshalCachePrincipal(docJSON)
1✔
669
        if err != nil {
1✔
670
                base.WarnfCtx(ctx, "changeCache: Error unmarshaling doc %q: %v", base.UD(docID), err)
×
671
                return
×
672
        }
×
673
        sequence := princ.Sequence
1✔
674

1✔
675
        if sequence <= c.initialSequence {
2✔
676
                return // Tap is sending us an old value from before I started up; ignore it
1✔
677
        }
1✔
678

679
        // Now add the (somewhat fictitious) entry:
680
        change := &LogEntry{
1✔
681
                Sequence:     sequence,
1✔
682
                TimeReceived: timeReceived,
1✔
683
                IsPrincipal:  true,
1✔
684
        }
1✔
685
        if isUser {
2✔
686
                change.DocID = "_user/" + princ.Name
1✔
687
        } else {
2✔
688
                change.DocID = "_role/" + princ.Name
1✔
689
        }
1✔
690

691
        base.InfofCtx(ctx, base.KeyChanges, "Received #%d (%q)", change.Sequence, base.UD(change.DocID))
1✔
692

1✔
693
        changedChannels := c.processEntry(ctx, change)
1✔
694
        if c.notifyChange != nil && len(changedChannels) > 0 {
1✔
695
                c.notifyChange(ctx, changedChannels)
×
696
        }
×
697
}
698

699
// Handles a newly-arrived LogEntry.
700
func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channels.Set {
1✔
701
        c.lock.Lock()
1✔
702
        defer c.lock.Unlock()
1✔
703
        if c.logsDisabled {
2✔
704
                return nil
1✔
705
        }
1✔
706

707
        sequence := change.Sequence
1✔
708
        if change.Sequence > c.internalStats.highSeqFeed {
2✔
709
                c.internalStats.highSeqFeed = change.Sequence
1✔
710
        }
1✔
711

712
        // Duplicate handling - there are a few cases where processEntry can be called multiple times for a sequence:
713
        //   - recentSequences for rapidly updated documents
714
        //   - principal mutations that don't increment sequence
715
        // We can cancel processing early in these scenarios.
716
        // Check if this is a duplicate of an already processed sequence
717
        if sequence < c.nextSequence && !change.Skipped {
2✔
718
                // check for presence in skippedSeqs, it's possible that change.skipped can be marked false in recent sequence handling
1✔
719
                // but this change is subsequently pushed to skipped before acquiring cache mutex in this function
1✔
720
                if !c.WasSkipped(sequence) {
2✔
721
                        base.DebugfCtx(ctx, base.KeyCache, "  Ignoring duplicate of #%d", sequence)
1✔
722
                        return nil
1✔
723
                }
1✔
724
        }
725

726
        // Check if this is a duplicate of a pending sequence
727
        if _, found := c.receivedSeqs[sequence]; found {
2✔
728
                base.DebugfCtx(ctx, base.KeyCache, "  Ignoring duplicate of #%d", sequence)
1✔
729
                return nil
1✔
730
        }
1✔
731
        c.receivedSeqs[sequence] = struct{}{}
1✔
732

1✔
733
        var changedChannels channels.Set
1✔
734
        if sequence == c.nextSequence || c.nextSequence == 0 {
2✔
735
                // This is the expected next sequence so we can add it now:
1✔
736
                changedChannels = c._addToCache(ctx, change)
1✔
737
                // Also add any pending sequences that are now contiguous:
1✔
738
                changedChannels = changedChannels.Update(c._addPendingLogs(ctx))
1✔
739
        } else if sequence > c.nextSequence {
3✔
740
                // There's a missing sequence (or several), so put this one on ice until it arrives:
1✔
741
                heap.Push(&c.pendingLogs, change)
1✔
742
                numPending := len(c.pendingLogs)
1✔
743
                c.internalStats.pendingSeqLen = numPending
1✔
744
                if base.LogDebugEnabled(ctx, base.KeyCache) {
2✔
745
                        base.DebugfCtx(ctx, base.KeyCache, "  Deferring #%d (%d now waiting for #%d...#%d) doc %q / %q",
1✔
746
                                sequence, numPending, c.nextSequence, c.pendingLogs[0].Sequence-1, base.UD(change.DocID), change.RevID)
1✔
747
                }
1✔
748
                // Update max pending high watermark stat
749
                if numPending > c.internalStats.maxPending {
2✔
750
                        c.internalStats.maxPending = numPending
1✔
751
                }
1✔
752

753
                if numPending > c.options.CachePendingSeqMaxNum {
2✔
754
                        // Too many pending; add the oldest one:
1✔
755
                        changedChannels = c._addPendingLogs(ctx)
1✔
756
                }
1✔
757
        } else if sequence > c.initialSequence {
2✔
758
                // Out-of-order sequence received!
1✔
759
                // Remove from skipped sequence queue
1✔
760
                if !c.WasSkipped(sequence) {
1✔
761
                        // Error removing from skipped sequences
×
762
                        base.InfofCtx(ctx, base.KeyCache, "  Received unexpected out-of-order change - not in skippedSeqs (seq %d, expecting %d) doc %q / %q", sequence, c.nextSequence, base.UD(change.DocID), change.RevID)
×
763
                } else {
1✔
764
                        base.InfofCtx(ctx, base.KeyCache, "  Received previously skipped out-of-order change (seq %d, expecting %d) doc %q / %q ", sequence, c.nextSequence, base.UD(change.DocID), change.RevID)
1✔
765
                        change.Skipped = true
1✔
766
                }
1✔
767

768
                changedChannels = changedChannels.Update(c._addToCache(ctx, change))
1✔
769
                // Add to cache before removing from skipped, to ensure lowSequence doesn't get incremented until results are available
1✔
770
                // in cache
1✔
771
                err := c.RemoveSkipped(sequence)
1✔
772
                if err != nil {
1✔
773
                        base.DebugfCtx(ctx, base.KeyCache, "Error removing skipped sequence: #%d from cache: %v", sequence, err)
×
774
                }
×
775
        }
776
        return changedChannels
1✔
777
}
778

779
// Adds an entry to the appropriate channels' caches, returning the affected channels.  lateSequence
780
// flag indicates whether it was a change arriving out of sequence
781
func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) channels.Set {
1✔
782

1✔
783
        if change.Sequence >= c.nextSequence {
2✔
784
                c.nextSequence = change.Sequence + 1
1✔
785
        }
1✔
786
        // check if change is unused sequence range
787
        if change.EndSequence != 0 {
2✔
788
                c.nextSequence = change.EndSequence + 1
1✔
789
        }
1✔
790
        delete(c.receivedSeqs, change.Sequence)
1✔
791

1✔
792
        // If unused sequence, notify the cache and return
1✔
793
        if change.UnusedSequence {
2✔
794
                c.channelCache.AddUnusedSequence(change)
1✔
795
                return nil
1✔
796
        }
1✔
797

798
        if change.IsPrincipal {
2✔
799
                c.channelCache.AddPrincipal(change)
1✔
800
                return nil
1✔
801
        }
1✔
802

803
        // updatedChannels tracks the set of channels that should be notified of the change.  This includes
804
        // the change's active channels, as well as any channel removals for the active revision.
805
        updatedChannels := c.channelCache.AddToCache(ctx, change)
1✔
806
        if base.LogDebugEnabled(ctx, base.KeyChanges) {
2✔
807
                base.DebugfCtx(ctx, base.KeyChanges, " #%d ==> channels %v", change.Sequence, base.UD(updatedChannels))
1✔
808
        }
1✔
809

810
        if change.TimeReceived != 0 {
2✔
811
                c.db.DbStats.Database().DCPCachingCount.Add(1)
1✔
812
                c.db.DbStats.Database().DCPCachingTime.Add(change.TimeReceived.Since())
1✔
813
        }
1✔
814

815
        return updatedChannels
1✔
816
}
817

818
// Add the first change(s) from pendingLogs if they're the next sequence.  If not, and we've been
819
// waiting too long for nextSequence, move nextSequence to skipped queue.
820
// Returns the channels that changed.
821
func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
1✔
822
        var changedChannels channels.Set
1✔
823
        var isNext bool
1✔
824

1✔
825
        for len(c.pendingLogs) > 0 {
2✔
826
                oldestPending := c.pendingLogs[0]
1✔
827
                isNext = oldestPending.Sequence == c.nextSequence
1✔
828

1✔
829
                if isNext {
2✔
830
                        oldestPending = c._popPendingLog(ctx)
1✔
831
                        changedChannels = changedChannels.Update(c._addToCache(ctx, oldestPending))
1✔
832
                } else if oldestPending.Sequence < c.nextSequence {
3✔
833
                        // oldest pending is lower than next sequence, should be ignored
1✔
834
                        base.InfofCtx(ctx, base.KeyCache, "Oldest entry in pending logs %v (%d, %d) is earlier than cache next sequence (%d), ignoring as sequence has already been cached", base.UD(oldestPending.DocID), oldestPending.Sequence, oldestPending.EndSequence, c.nextSequence)
1✔
835
                        oldestPending = c._popPendingLog(ctx)
1✔
836

1✔
837
                        // If the oldestPending was a range that extended past nextSequence, update nextSequence
1✔
838
                        if oldestPending.IsUnusedRange() && oldestPending.EndSequence >= c.nextSequence {
2✔
839
                                c.nextSequence = oldestPending.EndSequence + 1
1✔
840
                        }
1✔
841
                } else if len(c.pendingLogs) > c.options.CachePendingSeqMaxNum || c.pendingLogs[0].TimeReceived.OlderOrEqual(c.options.CachePendingSeqMaxWait) {
2✔
842
                        //  Skip all sequences up to the oldest Pending
1✔
843
                        c.PushSkipped(ctx, c.nextSequence, oldestPending.Sequence-1)
1✔
844
                        c.nextSequence = oldestPending.Sequence
1✔
845
                } else {
2✔
846
                        // nextSequence is not in pending logs, and pending logs size/age doesn't trigger skipped sequences
1✔
847
                        break
1✔
848
                }
849
        }
850

851
        c.internalStats.pendingSeqLen = len(c.pendingLogs)
1✔
852

1✔
853
        atomic.StoreInt64(&c.lastAddPendingTime, time.Now().UnixNano())
1✔
854
        return changedChannels
1✔
855
}
856

857
// _popPendingLog pops the next pending LogEntry from the c.pendingLogs heap.  When the popped entry is an unused range,
858
// performs a defensive check for duplicates with the next entry in pending.  If unused range overlaps with next entry,
859
// reduces the unused range to stop at the next pending entry.
860
func (c *changeCache) _popPendingLog(ctx context.Context) *LogEntry {
1✔
861
        poppedEntry := heap.Pop(&c.pendingLogs).(*LogEntry)
1✔
862
        // If it's not a range, no additional handling needed
1✔
863
        if !poppedEntry.IsUnusedRange() {
2✔
864
                return poppedEntry
1✔
865
        }
1✔
866
        // If there are no more pending logs, no additional handling needed
867
        if len(c.pendingLogs) == 0 {
2✔
868
                return poppedEntry
1✔
869
        }
1✔
870

871
        nextPendingEntry := c.pendingLogs[0]
1✔
872
        // If popped entry range does not overlap with next pending entry, no additional handling needed
1✔
873
        //  e.g. popped [15-20], nextPendingEntry is [25]
1✔
874
        if poppedEntry.EndSequence < nextPendingEntry.Sequence {
2✔
875
                return poppedEntry
1✔
876
        }
1✔
877

878
        // If nextPendingEntry's sequence duplicates the start of the unused range, ignored popped entry and return next entry instead
879
        //   e.g. popped [15-20], nextPendingEntry is [15]
880
        if poppedEntry.Sequence == nextPendingEntry.Sequence {
1✔
881
                base.InfofCtx(ctx, base.KeyCache, "Unused sequence range in pendingLogs (%d, %d) has start equal to next pending sequence (%s, %d) - unused range will be ignored", poppedEntry.Sequence, poppedEntry.EndSequence, nextPendingEntry.DocID, nextPendingEntry.Sequence)
×
882
                return c._popPendingLog(ctx)
×
883
        }
×
884

885
        // Otherwise, reduce the popped unused range to end before the next pending sequence
886
        //  e.g. popped [15-20], nextPendingEntry is [18]
887
        base.InfofCtx(ctx, base.KeyCache, "Unused sequence range in pendingLogs (%d, %d) overlaps with next pending sequence (%s, %d) - unused range will be truncated", poppedEntry.Sequence, poppedEntry.EndSequence, nextPendingEntry.DocID, nextPendingEntry.Sequence)
1✔
888
        poppedEntry.EndSequence = nextPendingEntry.Sequence - 1
1✔
889
        return poppedEntry
1✔
890
}
891

892
func (c *changeCache) GetStableSequence(docID string) SequenceID {
1✔
893
        // Stable sequence is independent of docID in changeCache
1✔
894
        return SequenceID{Seq: c.LastSequence()}
1✔
895
}
1✔
896

897
func (c *changeCache) getChannelCache() ChannelCache {
1✔
898
        return c.channelCache
1✔
899
}
1✔
900

901
// ////// CHANGE ACCESS:
902

903
func (c *changeCache) GetChanges(ctx context.Context, channel channels.ID, options ChangesOptions) ([]*LogEntry, error) {
×
904

×
905
        if c.stopped.IsTrue() {
×
906
                return nil, base.HTTPErrorf(503, "Database closed")
×
907
        }
×
908
        return c.channelCache.GetChanges(ctx, channel, options)
×
909
}
910

911
// Returns the sequence number the cache is up-to-date with.
912
func (c *changeCache) LastSequence() uint64 {
1✔
913

1✔
914
        lastSequence := c.getNextSequence() - 1
1✔
915
        return lastSequence
1✔
916
}
1✔
917

918
func (c *changeCache) getOldestSkippedSequence(ctx context.Context) uint64 {
1✔
919
        oldestSkippedSeq := c.skippedSeqs.getOldest()
1✔
920
        if oldestSkippedSeq > 0 {
2✔
921
                base.DebugfCtx(ctx, base.KeyChanges, "Get oldest skipped, returning: %d", oldestSkippedSeq)
1✔
922
        }
1✔
923
        return oldestSkippedSeq
1✔
924
}
925

926
// Set the initial sequence.  Presumes that change cache is already locked.
927
func (c *changeCache) _setInitialSequence(initialSequence uint64) {
1✔
928
        c.initialSequence = initialSequence
1✔
929
        c.nextSequence = initialSequence + 1
1✔
930
}
1✔
931

932
// Concurrent-safe get value of nextSequence
933
func (c *changeCache) getNextSequence() (nextSequence uint64) {
1✔
934
        c.lock.RLock()
1✔
935
        nextSequence = c.nextSequence
1✔
936
        c.lock.RUnlock()
1✔
937
        return nextSequence
1✔
938
}
1✔
939

940
// ////// LOG PRIORITY QUEUE -- container/heap callbacks that should not be called directly.   Use heap.Init/Push/etc()
941

942
func (h LogPriorityQueue) Len() int           { return len(h) }
1✔
943
func (h LogPriorityQueue) Less(i, j int) bool { return h[i].Sequence < h[j].Sequence }
1✔
944
func (h LogPriorityQueue) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
1✔
945

946
func (h *LogPriorityQueue) Push(x interface{}) {
1✔
947
        *h = append(*h, x.(*LogEntry))
1✔
948
}
1✔
949

950
func (h *LogPriorityQueue) Pop() interface{} {
1✔
951
        old := *h
1✔
952
        n := len(old)
1✔
953
        x := old[n-1]
1✔
954
        *h = old[0 : n-1]
1✔
955
        return x
1✔
956
}
1✔
957

958
// ////// SKIPPED SEQUENCE QUEUE
959

960
func (c *changeCache) RemoveSkipped(x uint64) error {
1✔
961
        numSkipped, err := c.skippedSeqs.removeSeq(x)
1✔
962
        if err != nil {
1✔
963
                return err
×
964
        }
×
965
        if numSkipped == 0 {
2✔
966
                c.db.mutationListener.BroadcastSlowMode.CompareAndSwap(true, false)
1✔
967
        }
1✔
968
        return nil
1✔
969
}
970

971
func (c *changeCache) WasSkipped(x uint64) bool {
1✔
972
        return c.skippedSeqs.Contains(x)
1✔
973
}
1✔
974

975
func (c *changeCache) PushSkipped(ctx context.Context, startSeq uint64, endSeq uint64) {
1✔
976
        if startSeq > endSeq {
1✔
977
                base.InfofCtx(ctx, base.KeyCache, "cannot push negative skipped sequence range to skipped list: %d %d", startSeq, endSeq)
×
978
                return
×
979
        }
×
980
        c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq))
1✔
981
        c.db.mutationListener.BroadcastSlowMode.CompareAndSwap(false, true)
1✔
982
}
983

984
// waitForSequence blocks up to maxWaitTime until the given sequence has been received.
985
func (c *changeCache) waitForSequence(ctx context.Context, sequence uint64, maxWaitTime time.Duration) error {
1✔
986
        startTime := time.Now()
1✔
987

1✔
988
        worker := func() (bool, error, interface{}) {
2✔
989
                if c.getNextSequence() >= sequence+1 {
2✔
990
                        base.DebugfCtx(ctx, base.KeyCache, "waitForSequence(%d) took %v", sequence, time.Since(startTime))
1✔
991
                        return false, nil, nil
1✔
992
                }
1✔
993
                // retry
994
                return true, nil, nil
1✔
995
        }
996

997
        ctx, cancel := context.WithDeadline(ctx, startTime.Add(maxWaitTime))
1✔
998
        sleeper := base.SleeperFuncCtx(base.CreateMaxDoublingSleeperFunc(math.MaxInt64, 1, 100), ctx)
1✔
999
        err, _ := base.RetryLoop(ctx, fmt.Sprintf("waitForSequence(%d)", sequence), worker, sleeper)
1✔
1000
        cancel()
1✔
1001
        return err
1✔
1002
}
1003

1004
// waitForSequenceNotSkipped blocks up to maxWaitTime until the given sequence has been received or skipped.
1005
func (c *changeCache) waitForSequenceNotSkipped(ctx context.Context, sequence uint64, maxWaitTime time.Duration) error {
1✔
1006
        startTime := time.Now()
1✔
1007

1✔
1008
        worker := func() (bool, error, interface{}) {
2✔
1009
                if c.getNextSequence() >= sequence+1 {
2✔
1010
                        foundInMissing := c.skippedSeqs.Contains(sequence)
1✔
1011
                        if !foundInMissing {
2✔
1012
                                base.DebugfCtx(ctx, base.KeyCache, "waitForSequenceNotSkipped(%d) took %v", sequence, time.Since(startTime))
1✔
1013
                                return false, nil, nil
1✔
1014
                        }
1✔
1015
                }
1016
                // retry
1017
                return true, nil, nil
1✔
1018
        }
1019

1020
        ctx, cancel := context.WithDeadline(ctx, startTime.Add(maxWaitTime))
1✔
1021
        sleeper := base.SleeperFuncCtx(base.CreateMaxDoublingSleeperFunc(math.MaxInt64, 1, 100), ctx)
1✔
1022
        err, _ := base.RetryLoop(ctx, fmt.Sprintf("waitForSequenceNotSkipped(%d)", sequence), worker, sleeper)
1✔
1023
        cancel()
1✔
1024
        return err
1✔
1025
}
1026

1027
func (c *changeCache) _getMaxStableCached(ctx context.Context) uint64 {
1✔
1028
        oldestSkipped := c.getOldestSkippedSequence(ctx)
1✔
1029
        if oldestSkipped > 0 {
2✔
1030
                return oldestSkipped - 1
1✔
1031
        }
1✔
1032
        return c.nextSequence - 1
1✔
1033
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc