• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

couchbase / sync_gateway / 394

07 May 2025 11:13AM UTC coverage: 64.019% (-0.5%) from 64.469%
394

push

jenkins

web-flow
CBG-4605: dcp mode for caching tool (#7483)

0 of 423 new or added lines in 5 files covered. (0.0%)

8 existing lines in 4 files now uncovered.

36619 of 57200 relevant lines covered (64.02%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/tools/cache_perf_tool/dcpDataGeneration.go
1
// Copyright 2025-Present Couchbase, Inc.
2
//
3
// Use of this software is governed by the Business Source License included
4
// in the file licenses/BSL-Couchbase.txt.  As of the Change Date specified
5
// in that file, in accordance with the Business Source License, use of this
6
// software will be governed by the Apache License, Version 2.0, included in
7
// the file licenses/APL2.txt.
8

9
package main
10

11
import (
12
        "context"
13
        "expvar"
14
        "fmt"
15
        "log"
16
        "strconv"
17
        "testing"
18
        "time"
19

20
        "github.com/couchbase/gocbcore/v10"
21
        sgbucket "github.com/couchbase/sg-bucket"
22
        "github.com/couchbase/sync_gateway/base"
23
        "github.com/couchbase/sync_gateway/channels"
24
        "github.com/couchbase/sync_gateway/db"
25
        "github.com/couchbaselabs/rosmar"
26
)
27

28
var hlc = rosmar.NewHybridLogicalClock(0)
29

30
type dcpDataGen struct {
31
        seqAlloc          *sequenceAllocator
32
        delays            []time.Duration
33
        dbCtx             *db.DatabaseContext
34
        client            *base.DCPClient
35
        numChannelsPerDoc int
36
        numTotalChannels  int
37
        simRapidUpdate    bool
38
}
39

NEW
40
func (dcp *dcpDataGen) vBucketCreation(ctx context.Context) {
×
NEW
41
        delayIndex := 0
×
NEW
42
        // vBucket creation logic
×
NEW
43
        for i := 0; i < 1024; i++ {
×
NEW
44
                time.Sleep(500 * time.Millisecond) // we need a slight delay each iteration otherwise many vBuckets end up writing at the same times
×
NEW
45
                if i == 520 {
×
NEW
46
                        go dcp.syncSeqVBucketCreation(ctx, uint16(i), 2*time.Second) // sync seq hot vBucket so high delay
×
NEW
47
                } else {
×
NEW
48
                        // iterate through provided delays and assign to vBucket, when we get to end of delay list reset index and
×
NEW
49
                        // start from start again this will ensure some consistency between runs of the same parameters
×
NEW
50
                        if delayIndex == len(dcp.delays) {
×
NEW
51
                                delayIndex = 0 // reset index so we don't go out of bounds
×
NEW
52
                        }
×
NEW
53
                        go dcp.vBucketGoroutine(ctx, uint16(i), dcp.delays[delayIndex])
×
NEW
54
                        delayIndex++
×
55
                }
56
        }
57
}
58

NEW
59
func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay time.Duration) {
×
NEW
60
        numGoroutines.Add(1)
×
NEW
61
        defer numGoroutines.Add(-1)
×
NEW
62
        vbSeq := uint64(0)
×
NEW
63
        chanCount := 0
×
NEW
64
        var err error
×
NEW
65
        var newArr []byte
×
NEW
66
        var seqList []uint64
×
NEW
67
        if delay.Nanoseconds() == 0 {
×
NEW
68
                // mutate as fast as possible
×
NEW
69
                for {
×
NEW
70
                        var sgwSeqno uint64
×
NEW
71
                        if dcp.simRapidUpdate && vbSeq%2 == 0 { // simulate rapid update on subset of vBuckets if enabled
×
NEW
72
                                seqList = dcp.seqAlloc.nextNSequences(5)
×
NEW
73
                        } else {
×
NEW
74
                                sgwSeqno = dcp.seqAlloc.nextSeq()
×
NEW
75
                        }
×
NEW
76
                        casVal := uint64(hlc.Now())
×
NEW
77
                        select {
×
NEW
78
                        case <-ctx.Done():
×
NEW
79
                                return
×
NEW
80
                        default:
×
NEW
81
                                vbSeq++
×
NEW
82
                                dcpMutation := gocbcore.DcpMutation{
×
NEW
83
                                        VbID:     vbNo,
×
NEW
84
                                        SeqNo:    vbSeq,
×
NEW
85
                                        StreamID: vbNo,
×
NEW
86
                                        Flags:    0,
×
NEW
87
                                        RevNo:    1,
×
NEW
88
                                        Expiry:   0,
×
NEW
89
                                        Cas:      casVal,
×
NEW
90
                                        Datatype: 5,
×
NEW
91
                                        Key:      []byte("key-" + strconv.FormatUint(vbSeq, 10) + "-" + strconv.FormatUint(sgwSeqno, 10)),
×
NEW
92
                                }
×
NEW
93

×
NEW
94
                                if sgwSeqno == 0 {
×
NEW
95
                                        newArr, chanCount, err = dcp.mutateWithDedupe(seqList, chanCount, casVal)
×
NEW
96
                                } else {
×
NEW
97
                                        newArr, chanCount, err = dcp.mutateSyncData(sgwSeqno, chanCount, casVal)
×
NEW
98
                                }
×
NEW
99
                                if err != nil {
×
NEW
100
                                        log.Printf("Error setting sequence: %v", err)
×
NEW
101
                                        return
×
NEW
102
                                }
×
NEW
103
                                dcpMutation.Value = newArr
×
NEW
104
                                dcp.client.Mutation(dcpMutation)
×
105
                        }
106
                }
107
        }
108

NEW
109
        ticker := time.NewTicker(delay)
×
NEW
110
        defer ticker.Stop()
×
NEW
111
        // we have goroutine with a write delay associated with it
×
NEW
112
        for {
×
NEW
113
                var sgwSeqno uint64
×
NEW
114
                // allocate seq before wait on ticker
×
NEW
115
                if dcp.simRapidUpdate && vbSeq%2 == 0 { // simulate rapid update on subset of vBuckets if enabled
×
NEW
116
                        seqList = dcp.seqAlloc.nextNSequences(5)
×
NEW
117
                } else {
×
NEW
118
                        sgwSeqno = dcp.seqAlloc.nextSeq()
×
NEW
119
                }
×
NEW
120
                casVal := uint64(hlc.Now())
×
NEW
121
                select {
×
NEW
122
                case <-ctx.Done():
×
NEW
123
                        return
×
NEW
124
                case <-ticker.C:
×
NEW
125
                        vbSeq++
×
NEW
126
                        dcpMutation := gocbcore.DcpMutation{
×
NEW
127
                                VbID:     vbNo,
×
NEW
128
                                SeqNo:    vbSeq,
×
NEW
129
                                StreamID: vbNo,
×
NEW
130
                                Flags:    0,
×
NEW
131
                                RevNo:    1,
×
NEW
132
                                Expiry:   0,
×
NEW
133
                                Cas:      casVal,
×
NEW
134
                                Datatype: 5,
×
NEW
135
                                Key:      []byte("key-" + strconv.FormatUint(vbSeq, 10) + "-" + strconv.FormatUint(sgwSeqno, 10)),
×
NEW
136
                        }
×
NEW
137

×
NEW
138
                        if sgwSeqno == 0 {
×
NEW
139
                                newArr, chanCount, err = dcp.mutateWithDedupe(seqList, chanCount, casVal)
×
NEW
140
                        } else {
×
NEW
141
                                newArr, chanCount, err = dcp.mutateSyncData(sgwSeqno, chanCount, casVal)
×
NEW
142
                        }
×
NEW
143
                        if err != nil {
×
NEW
144
                                log.Printf("Error setting sequence: %v", err)
×
NEW
145
                                return
×
NEW
146
                        }
×
NEW
147
                        dcpMutation.Value = newArr
×
NEW
148
                        dcp.client.Mutation(dcpMutation)
×
149
                }
150
        }
151
}
152

NEW
153
func (dcp *dcpDataGen) syncSeqVBucketCreation(ctx context.Context, vbNo uint16, delay time.Duration) {
×
NEW
154
        numGoroutines.Add(1)
×
NEW
155
        defer numGoroutines.Add(-1)
×
NEW
156
        ticker := time.NewTicker(delay)
×
NEW
157
        defer ticker.Stop()
×
NEW
158
        vbSeq := uint64(0)
×
NEW
159
        chanCount := 0
×
NEW
160
        var err error
×
NEW
161
        var newArr []byte
×
NEW
162
        go func() {
×
NEW
163
                numGoroutines.Add(1)
×
NEW
164
                defer numGoroutines.Add(-1)
×
NEW
165
                for {
×
NEW
166
                        select {
×
NEW
167
                        case <-ctx.Done():
×
NEW
168
                                return
×
NEW
169
                        case _ = <-dcp.seqAlloc.syncSeqEvent:
×
NEW
170
                                // channel has cap of 1 so sort of simulates dedupe on kv side
×
NEW
171
                                vbSeq++
×
NEW
172
                                dcpMutation := gocbcore.DcpMutation{
×
NEW
173
                                        VbID:     vbNo,
×
NEW
174
                                        SeqNo:    vbSeq,
×
NEW
175
                                        StreamID: vbNo,
×
NEW
176
                                        Flags:    0,
×
NEW
177
                                        RevNo:    1,
×
NEW
178
                                        Expiry:   0,
×
NEW
179
                                        Cas:      uint64(hlc.Now()),
×
NEW
180
                                        Datatype: 5,
×
NEW
181
                                        Key:      []byte("_sync:seq"),
×
NEW
182
                                        Value:    sgbucket.EncodeValueWithXattrs([]byte{50}),
×
NEW
183
                                }
×
NEW
184
                                dcp.client.Mutation(dcpMutation)
×
185
                        }
186
                }
187
        }()
188

NEW
189
        for {
×
NEW
190
                // only allocate sgw seqno on actual write (not sync seq event) and BEFORE the delay as sgw will have allocated
×
NEW
191
                // it's seqno before any delay on vBuckets
×
NEW
192
                sgwSeqno := dcp.seqAlloc.nextSeq()
×
NEW
193
                casVal := uint64(hlc.Now())
×
NEW
194
                select {
×
NEW
195
                case <-ctx.Done():
×
NEW
196
                        return
×
NEW
197
                case <-ticker.C:
×
NEW
198
                        vbSeq++
×
NEW
199
                        dcpMutation := gocbcore.DcpMutation{
×
NEW
200
                                VbID:     vbNo,
×
NEW
201
                                SeqNo:    vbSeq,
×
NEW
202
                                StreamID: vbNo,
×
NEW
203
                                Flags:    0,
×
NEW
204
                                RevNo:    1,
×
NEW
205
                                Expiry:   0,
×
NEW
206
                                Cas:      casVal,
×
NEW
207
                                Datatype: 5,
×
NEW
208
                                Key:      []byte("key-" + strconv.FormatUint(vbSeq, 10) + "-" + strconv.FormatUint(sgwSeqno, 10)),
×
NEW
209
                        }
×
NEW
210

×
NEW
211
                        newArr, chanCount, err = dcp.mutateSyncData(sgwSeqno, chanCount, casVal)
×
NEW
212
                        if err != nil {
×
NEW
213
                                log.Printf("Error setting sequence: %v", err)
×
NEW
214
                                return
×
NEW
215
                        }
×
NEW
216
                        dcpMutation.Value = newArr
×
NEW
217
                        dcp.client.Mutation(dcpMutation)
×
218
                }
219
        }
220
}
221

NEW
222
func (dcp *dcpDataGen) mutateSyncData(sgwSeqno uint64, chanCount int, casValue uint64) ([]byte, int, error) {
×
NEW
223
        chanMap := make(channels.ChannelMap)
×
NEW
224
        chanSet := make([]db.ChannelSetEntry, 0, dcp.numChannelsPerDoc)
×
NEW
225
        chanSetMap := base.Set{}
×
NEW
226
        for i := 0; i < dcp.numChannelsPerDoc; i++ {
×
NEW
227
                if chanCount == dcp.numTotalChannels {
×
NEW
228
                        chanCount = 0 // reset channel count so we don't go out of bounds
×
NEW
229
                }
×
NEW
230
                chanName := "test-" + strconv.Itoa(chanCount)
×
NEW
231
                chanMap[chanName] = nil
×
NEW
232
                chanSet = append(chanSet, db.ChannelSetEntry{
×
NEW
233
                        Name:  chanName,
×
NEW
234
                        Start: sgwSeqno,
×
NEW
235
                })
×
NEW
236
                chanSetMap[chanName] = struct{}{}
×
NEW
237
                chanCount++
×
238
        }
NEW
239
        revInf := db.RevInfo{
×
NEW
240
                ID:       "1-abc",
×
NEW
241
                Channels: chanSetMap,
×
NEW
242
        }
×
NEW
243
        revTree := db.RevTree{
×
NEW
244
                "1-abc": &revInf,
×
NEW
245
        }
×
NEW
246

×
NEW
247
        syncData := db.SyncData{
×
NEW
248
                Sequence:        sgwSeqno,
×
NEW
249
                CurrentRev:      "1-abc",
×
NEW
250
                History:         revTree,
×
NEW
251
                Channels:        chanMap,
×
NEW
252
                ChannelSet:      chanSet,
×
NEW
253
                TimeSaved:       time.Now(),
×
NEW
254
                Cas:             base.CasToString(casValue),
×
NEW
255
                RecentSequences: []uint64{sgwSeqno},
×
NEW
256
        }
×
NEW
257
        byteArrSync, err := base.JSONMarshal(syncData)
×
NEW
258
        if err != nil {
×
NEW
259
                log.Printf("Error marshalling sync data: %v", err)
×
NEW
260
                return nil, 0, err
×
NEW
261
        }
×
262

NEW
263
        inp := sgbucket.Xattr{
×
NEW
264
                Name:  "_sync",
×
NEW
265
                Value: byteArrSync,
×
NEW
266
        }
×
NEW
267
        encodedVal := sgbucket.EncodeValueWithXattrs([]byte(`{"some":"body"}`), inp)
×
NEW
268
        return encodedVal, chanCount, nil
×
269
}
270

NEW
271
func (dcp *dcpDataGen) mutateWithDedupe(seqs []uint64, chanCount int, casValue uint64) ([]byte, int, error) {
×
NEW
272
        chanMap := make(channels.ChannelMap)
×
NEW
273
        chanSet := make([]db.ChannelSetEntry, 0, dcp.numChannelsPerDoc)
×
NEW
274
        currSeq := seqs[len(seqs)-1] // grab current seq form end of seq list
×
NEW
275
        chanSetMap := base.Set{}
×
NEW
276
        for i := 0; i < dcp.numChannelsPerDoc; i++ {
×
NEW
277
                if chanCount == dcp.numTotalChannels {
×
NEW
278
                        chanCount = 0 // reset channel count so we don't go out of bounds
×
NEW
279
                }
×
NEW
280
                chanName := "test-" + strconv.Itoa(chanCount)
×
NEW
281
                chanMap[chanName] = nil
×
NEW
282
                chanSet = append(chanSet, db.ChannelSetEntry{
×
NEW
283
                        Name:  chanName,
×
NEW
284
                        Start: currSeq,
×
NEW
285
                })
×
NEW
286
                chanSetMap[chanName] = struct{}{}
×
NEW
287
                chanCount++
×
288
        }
289

NEW
290
        revTree := make(db.RevTree)
×
NEW
291
        var revInf db.RevInfo
×
NEW
292
        for i := 1; i <= len(seqs); i++ {
×
NEW
293
                dummyRevID := fmt.Sprintf("%d-abc", i)
×
NEW
294
                revInf = db.RevInfo{
×
NEW
295
                        ID:       dummyRevID,
×
NEW
296
                        Channels: chanSetMap,
×
NEW
297
                }
×
NEW
298
                revTree[dummyRevID] = &revInf
×
NEW
299
        }
×
NEW
300
        currRev := fmt.Sprintf("%d-abc", len(seqs))
×
NEW
301

×
NEW
302
        syncData := db.SyncData{
×
NEW
303
                Sequence:        currSeq,
×
NEW
304
                CurrentRev:      currRev,
×
NEW
305
                History:         revTree,
×
NEW
306
                Channels:        chanMap,
×
NEW
307
                ChannelSet:      chanSet,
×
NEW
308
                TimeSaved:       time.Now(),
×
NEW
309
                Cas:             base.CasToString(casValue),
×
NEW
310
                RecentSequences: seqs,
×
NEW
311
        }
×
NEW
312
        byteArrSync, err := base.JSONMarshal(syncData)
×
NEW
313
        if err != nil {
×
NEW
314
                log.Printf("Error marshalling sync data: %v", err)
×
NEW
315
                return nil, 0, err
×
NEW
316
        }
×
317

NEW
318
        inp := sgbucket.Xattr{
×
NEW
319
                Name:  "_sync",
×
NEW
320
                Value: byteArrSync,
×
NEW
321
        }
×
NEW
322
        encodedVal := sgbucket.EncodeValueWithXattrs([]byte(`{"some":"body"}`), inp)
×
NEW
323
        return encodedVal, chanCount, nil
×
324
}
325

NEW
326
func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, numWorkers, numVBuckets int) (*base.DCPClient, error) {
×
NEW
327
        options := base.DCPClientOptions{
×
NEW
328
                MetadataStoreType: base.DCPMetadataStoreInMemory,
×
NEW
329
                GroupID:           "",
×
NEW
330
                DbStats:           dbStats,
×
NEW
331
                CollectionIDs:     []uint32{0},
×
NEW
332
                AgentPriority:     gocbcore.DcpAgentPriorityMed,
×
NEW
333
                CheckpointPrefix:  "",
×
NEW
334
                NumWorkers:        numWorkers,
×
NEW
335
        }
×
NEW
336
        // fake client that we want to hook into
×
NEW
337
        client, err := base.NewDCPClientForTest(ctx, t, "test", callback, options, bucket, uint16(numVBuckets))
×
NEW
338
        if err != nil {
×
NEW
339
                return nil, err
×
NEW
340
        }
×
341
        // we want to start dcp workers but not client given we aren't streaming data from a bucket for this test
NEW
342
        client.StartWorkersForTest(t)
×
NEW
343
        return client, err
×
344
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc