• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

couchbase / sync_gateway / 506

22 Jul 2025 02:51PM UTC coverage: 64.951% (+0.01%) from 64.94%
506

push

jenkins

web-flow
Increase WaitUntilReady for tests (#7642)

0 of 8 new or added lines in 3 files covered. (0.0%)

8 existing lines in 3 files now uncovered.

39909 of 61445 relevant lines covered (64.95%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.3
/db/active_replicator.go
1
/*
2
Copyright 2020-Present Couchbase, Inc.
3

4
Use of this software is governed by the Business Source License included in
5
the file licenses/BSL-Couchbase.txt.  As of the Change Date specified in that
6
file, in accordance with the Business Source License, use of this software will
7
be governed by the Apache License, Version 2.0, included in the file
8
licenses/APL2.txt.
9
*/
10

11
package db
12

13
import (
14
        "context"
15
        "encoding/base64"
16
        "errors"
17
        "fmt"
18
        "net/http"
19
        "net/url"
20

21
        "github.com/couchbase/go-blip"
22
        "github.com/couchbase/sync_gateway/base"
23
)
24

25
var ISGRUserAgent = base.NewSGProcessUserAgent("ISGR")
26

27
// ActiveReplicator is a wrapper to encapsulate separate push and pull active replicators.
28
type ActiveReplicator struct {
29
        ID     string
30
        Push   *ActivePushReplicator
31
        Pull   *ActivePullReplicator
32
        config *ActiveReplicatorConfig
33
}
34

35
// NewActiveReplicator returns a bidirectional active replicator for the given config.
36
func NewActiveReplicator(ctx context.Context, config *ActiveReplicatorConfig) (*ActiveReplicator, error) {
1✔
37
        ar := &ActiveReplicator{
1✔
38
                ID:     config.ID,
1✔
39
                config: config,
1✔
40
        }
1✔
41

1✔
42
        if pushReplication := config.Direction == ActiveReplicatorTypePush || config.Direction == ActiveReplicatorTypePushAndPull; pushReplication {
2✔
43
                var err error
1✔
44
                ar.Push, err = NewPushReplicator(ctx, config)
1✔
45
                if err != nil {
2✔
46
                        return nil, err
1✔
47
                }
1✔
48
                if ar.config.onComplete != nil {
2✔
49
                        ar.Push.onReplicatorComplete = ar._onReplicationComplete
1✔
50
                }
1✔
51
        }
52

53
        if pullReplication := config.Direction == ActiveReplicatorTypePull || config.Direction == ActiveReplicatorTypePushAndPull; pullReplication {
2✔
54
                var err error
1✔
55
                ar.Pull, err = NewPullReplicator(ctx, config)
1✔
56
                if err != nil {
2✔
57
                        return nil, err
1✔
58
                }
1✔
59
                if ar.config.onComplete != nil {
2✔
60
                        ar.Pull.onReplicatorComplete = ar._onReplicationComplete
1✔
61
                }
1✔
62
        }
63

64
        base.InfofCtx(ctx, base.KeyReplicate, "Created active replicator ID:%s", config.ID)
1✔
65
        return ar, nil
1✔
66
}
67

68
func (ar *ActiveReplicator) Start(ctx context.Context) error {
1✔
69

1✔
70
        if ar.Push == nil && ar.Pull == nil {
1✔
71
                return fmt.Errorf("Attempted to start activeReplicator for %s with neither Push nor Pull defined", base.UD(ar.ID))
×
72
        }
×
73

74
        var pushErr error
1✔
75
        if ar.Push != nil {
2✔
76
                pushErr = ar.Push.Start(ctx)
1✔
77
        }
1✔
78

79
        var pullErr error
1✔
80
        if ar.Pull != nil {
2✔
81
                pullErr = ar.Pull.Start(ctx)
1✔
82
        }
1✔
83

84
        if pushErr != nil {
2✔
85
                return pushErr
1✔
86
        }
1✔
87

88
        if pullErr != nil {
2✔
89
                return pullErr
1✔
90
        }
1✔
91

92
        return nil
1✔
93
}
94

95
func (ar *ActiveReplicator) Stop() error {
1✔
96

1✔
97
        if ar.Push == nil && ar.Pull == nil {
1✔
98
                return fmt.Errorf("Attempted to stop activeReplicator for %s with neither Push nor Pull defined", base.UD(ar.ID))
×
99
        }
×
100

101
        var pushErr error
1✔
102
        if ar.Push != nil {
2✔
103
                pushErr = ar.Push.Stop()
1✔
104
        }
1✔
105

106
        var pullErr error
1✔
107
        if ar.Pull != nil {
2✔
108
                pullErr = ar.Pull.Stop()
1✔
109
        }
1✔
110

111
        if pushErr != nil {
1✔
112
                return pushErr
×
113
        }
×
114

115
        if pullErr != nil {
1✔
116
                return pullErr
×
117
        }
×
118

119
        if base.ValDefault(ar.config.reportHandlerPanicsOnStop, true) {
2✔
120
                if stats := ar.config.ReplicationStatsMap; stats != nil {
2✔
121
                        if val := stats.NumHandlersPanicked.Value(); val > 0 {
1✔
122
                                return fmt.Errorf("%d handlers panicked", val)
×
123
                        }
×
124
                }
125
        }
126

127
        return nil
1✔
128
}
129

130
func (ar *ActiveReplicator) Reset() error {
1✔
131
        var pushErr error
1✔
132
        if ar.Push != nil {
1✔
133
                pushErr = ar.Push.reset()
×
134
        }
×
135

136
        var pullErr error
1✔
137
        if ar.Pull != nil {
2✔
138
                pullErr = ar.Pull.reset()
1✔
139
        }
1✔
140

141
        if pushErr != nil {
1✔
142
                return pushErr
×
143
        }
×
144

145
        if pullErr != nil {
1✔
146
                return pullErr
×
147
        }
×
148

149
        if ar.config.ReplicationStatsMap != nil {
2✔
150
                ar.config.ReplicationStatsMap.Reset()
1✔
151
        }
1✔
152

153
        return nil
1✔
154
}
155

156
// _onReplicationComplete is invoked from Complete in an active replication.  If all replications
157
// associated with the ActiveReplicator are complete, onComplete is invoked
158
func (ar *ActiveReplicator) _onReplicationComplete() {
1✔
159
        allReplicationsComplete := true
1✔
160
        if ar.Push != nil && ar.Push.getState() != ReplicationStateStopped {
2✔
161
                allReplicationsComplete = false
1✔
162
        }
1✔
163
        if ar.Pull != nil && ar.Pull.getState() != ReplicationStateStopped {
1✔
UNCOV
164
                allReplicationsComplete = false
×
UNCOV
165
        }
×
166

167
        if allReplicationsComplete {
2✔
168
                ar.config.onComplete(ar.ID)
1✔
169
        }
1✔
170

171
}
172

173
func (ar *ActiveReplicator) State(ctx context.Context) (state string, errorMessage string) {
1✔
174

1✔
175
        state = ReplicationStateStopped
1✔
176
        if ar.Push != nil {
2✔
177
                state, errorMessage = ar.Push.getStateWithErrorMessage()
1✔
178
        }
1✔
179

180
        if ar.Pull != nil {
2✔
181
                pullState, pullErrorMessage := ar.Pull.getStateWithErrorMessage()
1✔
182
                state = combinedState(ctx, state, pullState)
1✔
183
                if pullErrorMessage != "" {
2✔
184
                        errorMessage = pullErrorMessage
1✔
185
                }
1✔
186
        }
187

188
        return state, errorMessage
1✔
189
}
190

191
func (ar *ActiveReplicator) GetStatus(ctx context.Context) *ReplicationStatus {
1✔
192

1✔
193
        status := &ReplicationStatus{
1✔
194
                ID: ar.ID,
1✔
195
        }
1✔
196
        status.Status, status.ErrorMessage = ar.State(ctx)
1✔
197

1✔
198
        if ar.Pull != nil {
2✔
199
                status.PullReplicationStatus = ar.Pull.GetStatus().PullReplicationStatus
1✔
200
        }
1✔
201

202
        if ar.Push != nil {
2✔
203
                status.PushReplicationStatus = ar.Push.GetStatus().PushReplicationStatus
1✔
204
        }
1✔
205

206
        return status
1✔
207
}
208

209
// connect establishes a blip connection to a remote host.
210
func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sender, bsc *BlipSyncContext, err error) {
1✔
211
        arc.replicationStats.NumConnectAttempts.Add(1)
1✔
212

1✔
213
        ctx := base.CorrelationIDLogCtx(
1✔
214
                arc.config.ActiveDB.AddDatabaseLogContext(base.NewNonCancelCtx().Ctx),
1✔
215
                arc.config.ID+idSuffix)
1✔
216
        if arc.config.RunAs != "" {
2✔
217
                ctx = base.UserLogCtx(ctx, arc.config.RunAs, base.UserDomainSyncGateway, nil)
1✔
218
        } else {
2✔
219
                ctx = arc.config.ActiveDB.AddBucketUserLogContext(ctx)
1✔
220
        }
1✔
221

222
        cancelCtx, cancelFunc := context.WithCancel(context.WithoutCancel(ctx)) // separate cancel context from parent cancel context
1✔
223

1✔
224
        var originPatterns []string // no origin headers for ISGR
1✔
225
        // NewSGBlipContext doesn't set cancellation context - active replication cancellation on db close is handled independently
1✔
226
        // TODO: CBG-3661 ActiveReplicator subprotocol versions
1✔
227
        // - make this configurable for testing mixed-version replications
1✔
228
        // - if unspecified, default to v2 and v3 until VV is supported with ISGR, then also include v4
1✔
229
        protocols := []string{CBMobileReplicationV3.SubprotocolString(), CBMobileReplicationV2.SubprotocolString()}
1✔
230
        ctx, blipContext, err := NewSGBlipContextWithProtocols(ctx, arc.config.ID+idSuffix, originPatterns, protocols, cancelCtx)
1✔
231
        if err != nil {
1✔
232
                cancelFunc()
×
233
                return nil, nil, err
×
234
        }
×
235
        blipContext.WebsocketPingInterval = arc.config.WebsocketPingInterval
1✔
236
        blipContext.OnExitCallback = func() {
2✔
237
                // fall into a reconnect loop only if the connection is unexpectedly closed.
1✔
238
                if ctx.Err() == nil {
2✔
239
                        arc.reconnect()
1✔
240
                }
1✔
241
        }
242

243
        bsc, err = NewBlipSyncContext(ctx, blipContext, arc.config.ActiveDB, arc.replicationStats, cancelFunc)
1✔
244
        if err != nil {
1✔
245
                return nil, nil, err
×
246
        }
×
247

248
        // NewBlipSyncContext has already set deltas as disabled/enabled based on config.ActiveDB.
249
        // If deltas have been disabled in the replication config, override this value
250
        if arc.config.DeltasEnabled == false {
2✔
251
                bsc.sgCanUseDeltas = false
1✔
252
        }
1✔
253

254
        blipSender, err = blipSync(*arc.config.RemoteDBURL, blipContext, arc.config.InsecureSkipVerify)
1✔
255
        if err != nil {
2✔
256
                return nil, nil, err
1✔
257
        }
1✔
258

259
        // set active subprotocol after handshake
260
        err = bsc.SetActiveCBMobileSubprotocol(blipContext.ActiveSubprotocol())
1✔
261
        if err != nil {
1✔
262
                return nil, nil, err
×
263
        }
×
264

265
        return blipSender, bsc, nil
1✔
266
}
267

268
// blipSync opens a connection to the target, and returns a blip.Sender to send messages over.
269
func blipSync(target url.URL, blipContext *blip.Context, insecureSkipVerify bool) (*blip.Sender, error) {
1✔
270
        // GET target database endpoint to see if reachable for exit-early/clearer error message
1✔
271
        req, err := http.NewRequest(http.MethodGet, target.String(), nil)
1✔
272
        if err != nil {
1✔
273
                return nil, err
×
274
        }
×
275
        client := base.GetHttpClientForWebSocket(insecureSkipVerify)
1✔
276
        resp, err := client.Do(req)
1✔
277
        if err != nil {
2✔
278
                return nil, err
1✔
279
        }
1✔
280

281
        err = resp.Body.Close()
1✔
282
        if err != nil {
1✔
283
                return nil, err
×
284
        }
×
285

286
        if resp.StatusCode != http.StatusOK {
2✔
287
                return nil, fmt.Errorf("unexpected status code %d from target database", resp.StatusCode)
1✔
288
        }
1✔
289

290
        // switch to websocket protocol scheme
291
        if target.Scheme == "http" {
2✔
292
                target.Scheme = "ws"
1✔
293
        } else if target.Scheme == "https" {
3✔
294
                target.Scheme = "wss"
1✔
295
        }
1✔
296

297
        // Strip userinfo from the URL, don't need it because of the Basic auth header.
298
        var basicAuthCreds *url.Userinfo
1✔
299
        if target.User != nil {
2✔
300
                // take a copy
1✔
301
                if password, hasPassword := target.User.Password(); hasPassword {
2✔
302
                        basicAuthCreds = url.UserPassword(target.User.Username(), password)
1✔
303
                } else {
2✔
304
                        basicAuthCreds = url.User(target.User.Username())
1✔
305
                }
1✔
306
                target.User = nil
1✔
307
        }
308

309
        config := blip.DialOptions{
1✔
310
                URL:        target.String() + "/_blipsync?" + BLIPSyncClientTypeQueryParam + "=" + string(BLIPClientTypeSGR2),
1✔
311
                HTTPClient: client,
1✔
312
                HTTPHeader: http.Header{
1✔
313
                        base.HTTPHeaderUserAgent: []string{ISGRUserAgent},
1✔
314
                },
1✔
315
        }
1✔
316

1✔
317
        if basicAuthCreds != nil {
2✔
318
                config.HTTPHeader.Add("Authorization", "Basic "+base64UserInfo(basicAuthCreds))
1✔
319
        }
1✔
320

321
        return blipContext.DialConfig(&config)
1✔
322
}
323

324
// base64UserInfo returns the base64 encoded version of the given UserInfo.
325
// Can't use i.String() here because that returns URL encoded versions of credentials.
326
func base64UserInfo(i *url.Userinfo) string {
1✔
327
        password, _ := i.Password()
1✔
328
        return base64.StdEncoding.EncodeToString([]byte(i.Username() + ":" + password))
1✔
329
}
1✔
330

331
// combinedState reports a combined replication state for a pushAndPull
332
// replication, based on the following criteria:
333
//   - if either replication is in error, return error
334
//   - if either replication is running, return running
335
//   - if both replications are stopped, return stopped
336
func combinedState(ctx context.Context, state1, state2 string) (combinedState string) {
1✔
337
        if state1 == "" {
1✔
338
                return state2
×
339
        }
×
340
        if state2 == "" {
1✔
341
                return state1
×
342
        }
×
343

344
        if state1 == ReplicationStateStopped && state2 == ReplicationStateStopped {
2✔
345
                return ReplicationStateStopped
1✔
346
        }
1✔
347

348
        if state1 == ReplicationStateRunning || state2 == ReplicationStateRunning {
2✔
349
                return ReplicationStateRunning
1✔
350
        }
1✔
351

352
        if state1 == ReplicationStateError || state2 == ReplicationStateError {
2✔
353
                return ReplicationStateError
1✔
354
        }
1✔
355

356
        if state1 == ReplicationStateReconnecting || state2 == ReplicationStateReconnecting {
2✔
357
                return ReplicationStateReconnecting
1✔
358
        }
1✔
359

360
        base.InfofCtx(ctx, base.KeyReplicate, "Unhandled combination of replication states (%s, %s), returning %s", state1, state2, state1)
1✔
361
        return state1
1✔
362
}
363

364
func (ar *ActiveReplicator) purgeCheckpoints() {
1✔
365
        if ar.Pull != nil {
2✔
366
                _ = ar.Pull.reset()
1✔
367
        }
1✔
368

369
        if ar.Push != nil {
2✔
370
                _ = ar.Push.reset()
1✔
371
        }
1✔
372
}
373

374
// LoadReplicationStatus attempts to load both push and pull replication checkpoints, and constructs the combined status
375
func LoadReplicationStatus(ctx context.Context, dbContext *DatabaseContext, replicationID string) (status *ReplicationStatus, err error) {
1✔
376

1✔
377
        status = &ReplicationStatus{
1✔
378
                ID: replicationID,
1✔
379
        }
1✔
380

1✔
381
        pullStatusKey := dbContext.MetadataKeys.ReplicationStatusKey(PullCheckpointID(replicationID))
1✔
382
        pullStatus, _ := getLocalStatus(ctx, dbContext.MetadataStore, pullStatusKey)
1✔
383
        if pullStatus != nil {
2✔
384
                status.PullReplicationStatus = pullStatus.PullReplicationStatus
1✔
385
                status.Status = pullStatus.Status
1✔
386
                status.ErrorMessage = pullStatus.ErrorMessage
1✔
387
                status.LastSeqPull = pullStatus.LastSeqPull
1✔
388
        }
1✔
389

390
        pushStatusKey := dbContext.MetadataKeys.ReplicationStatusKey(PushCheckpointID(replicationID))
1✔
391
        pushStatus, _ := getLocalStatus(ctx, dbContext.MetadataStore, pushStatusKey)
1✔
392
        if pushStatus != nil {
2✔
393
                status.PushReplicationStatus = pushStatus.PushReplicationStatus
1✔
394
                status.Status = pushStatus.Status
1✔
395
                status.ErrorMessage = pushStatus.ErrorMessage
1✔
396
                status.LastSeqPush = pushStatus.LastSeqPush
1✔
397
        }
1✔
398

399
        if pullStatus == nil && pushStatus == nil {
2✔
400
                return nil, errors.New("Replication status not found")
1✔
401
        }
1✔
402

403
        return status, nil
1✔
404
}
405

406
func PushCheckpointID(replicationID string) string {
1✔
407
        return "sgr2cp:push:" + replicationID
1✔
408
}
1✔
409

410
func PullCheckpointID(replicationID string) string {
1✔
411
        return "sgr2cp:pull:" + replicationID
1✔
412
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc