• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

couchbase / sync_gateway / 371

23 Apr 2025 10:20AM UTC coverage: 64.555% (+0.003%) from 64.552%
371

push

jenkins

web-flow
CBG-4456 fix unit test (#7494)

36559 of 56632 relevant lines covered (64.56%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.27
/db/active_replicator.go
1
/*
2
Copyright 2020-Present Couchbase, Inc.
3

4
Use of this software is governed by the Business Source License included in
5
the file licenses/BSL-Couchbase.txt.  As of the Change Date specified in that
6
file, in accordance with the Business Source License, use of this software will
7
be governed by the Apache License, Version 2.0, included in the file
8
licenses/APL2.txt.
9
*/
10

11
package db
12

13
import (
14
        "context"
15
        "encoding/base64"
16
        "errors"
17
        "fmt"
18
        "net/http"
19
        "net/url"
20

21
        "github.com/couchbase/go-blip"
22
        "github.com/couchbase/sync_gateway/base"
23
)
24

25
// ActiveReplicator is a wrapper to encapsulate separate push and pull active replicators.
26
type ActiveReplicator struct {
27
        ID     string
28
        Push   *ActivePushReplicator
29
        Pull   *ActivePullReplicator
30
        config *ActiveReplicatorConfig
31
}
32

33
// NewActiveReplicator returns a bidirectional active replicator for the given config.
34
func NewActiveReplicator(ctx context.Context, config *ActiveReplicatorConfig) (*ActiveReplicator, error) {
1✔
35
        ar := &ActiveReplicator{
1✔
36
                ID:     config.ID,
1✔
37
                config: config,
1✔
38
        }
1✔
39

1✔
40
        if pushReplication := config.Direction == ActiveReplicatorTypePush || config.Direction == ActiveReplicatorTypePushAndPull; pushReplication {
2✔
41
                var err error
1✔
42
                ar.Push, err = NewPushReplicator(ctx, config)
1✔
43
                if err != nil {
2✔
44
                        return nil, err
1✔
45
                }
1✔
46
                if ar.config.onComplete != nil {
2✔
47
                        ar.Push.onReplicatorComplete = ar._onReplicationComplete
1✔
48
                }
1✔
49
        }
50

51
        if pullReplication := config.Direction == ActiveReplicatorTypePull || config.Direction == ActiveReplicatorTypePushAndPull; pullReplication {
2✔
52
                var err error
1✔
53
                ar.Pull, err = NewPullReplicator(ctx, config)
1✔
54
                if err != nil {
2✔
55
                        return nil, err
1✔
56
                }
1✔
57
                if ar.config.onComplete != nil {
2✔
58
                        ar.Pull.onReplicatorComplete = ar._onReplicationComplete
1✔
59
                }
1✔
60
        }
61

62
        base.InfofCtx(ctx, base.KeyReplicate, "Created active replicator ID:%s", config.ID)
1✔
63
        return ar, nil
1✔
64
}
65

66
func (ar *ActiveReplicator) Start(ctx context.Context) error {
1✔
67

1✔
68
        if ar.Push == nil && ar.Pull == nil {
1✔
69
                return fmt.Errorf("Attempted to start activeReplicator for %s with neither Push nor Pull defined", base.UD(ar.ID))
×
70
        }
×
71

72
        var pushErr error
1✔
73
        if ar.Push != nil {
2✔
74
                pushErr = ar.Push.Start(ctx)
1✔
75
        }
1✔
76

77
        var pullErr error
1✔
78
        if ar.Pull != nil {
2✔
79
                pullErr = ar.Pull.Start(ctx)
1✔
80
        }
1✔
81

82
        if pushErr != nil {
2✔
83
                return pushErr
1✔
84
        }
1✔
85

86
        if pullErr != nil {
2✔
87
                return pullErr
1✔
88
        }
1✔
89

90
        return nil
1✔
91
}
92

93
func (ar *ActiveReplicator) Stop() error {
1✔
94

1✔
95
        if ar.Push == nil && ar.Pull == nil {
1✔
96
                return fmt.Errorf("Attempted to stop activeReplicator for %s with neither Push nor Pull defined", base.UD(ar.ID))
×
97
        }
×
98

99
        var pushErr error
1✔
100
        if ar.Push != nil {
2✔
101
                pushErr = ar.Push.Stop()
1✔
102
        }
1✔
103

104
        var pullErr error
1✔
105
        if ar.Pull != nil {
2✔
106
                pullErr = ar.Pull.Stop()
1✔
107
        }
1✔
108

109
        if pushErr != nil {
1✔
110
                return pushErr
×
111
        }
×
112

113
        if pullErr != nil {
1✔
114
                return pullErr
×
115
        }
×
116

117
        if base.ValDefault(ar.config.reportHandlerPanicsOnStop, true) {
2✔
118
                if stats := ar.config.ReplicationStatsMap; stats != nil {
2✔
119
                        if val := stats.NumHandlersPanicked.Value(); val > 0 {
1✔
120
                                return fmt.Errorf("%d handlers panicked", val)
×
121
                        }
×
122
                }
123
        }
124

125
        return nil
1✔
126
}
127

128
func (ar *ActiveReplicator) Reset() error {
1✔
129
        var pushErr error
1✔
130
        if ar.Push != nil {
1✔
131
                pushErr = ar.Push.reset()
×
132
        }
×
133

134
        var pullErr error
1✔
135
        if ar.Pull != nil {
2✔
136
                pullErr = ar.Pull.reset()
1✔
137
        }
1✔
138

139
        if pushErr != nil {
1✔
140
                return pushErr
×
141
        }
×
142

143
        if pullErr != nil {
1✔
144
                return pullErr
×
145
        }
×
146

147
        if ar.config.ReplicationStatsMap != nil {
2✔
148
                ar.config.ReplicationStatsMap.Reset()
1✔
149
        }
1✔
150

151
        return nil
1✔
152
}
153

154
// _onReplicationComplete is invoked from Complete in an active replication.  If all replications
155
// associated with the ActiveReplicator are complete, onComplete is invoked
156
func (ar *ActiveReplicator) _onReplicationComplete() {
1✔
157
        allReplicationsComplete := true
1✔
158
        if ar.Push != nil && ar.Push.getState() != ReplicationStateStopped {
1✔
159
                allReplicationsComplete = false
×
160
        }
×
161
        if ar.Pull != nil && ar.Pull.getState() != ReplicationStateStopped {
2✔
162
                allReplicationsComplete = false
1✔
163
        }
1✔
164

165
        if allReplicationsComplete {
2✔
166
                ar.config.onComplete(ar.ID)
1✔
167
        }
1✔
168

169
}
170

171
func (ar *ActiveReplicator) State(ctx context.Context) (state string, errorMessage string) {
1✔
172

1✔
173
        state = ReplicationStateStopped
1✔
174
        if ar.Push != nil {
2✔
175
                state, errorMessage = ar.Push.getStateWithErrorMessage()
1✔
176
        }
1✔
177

178
        if ar.Pull != nil {
2✔
179
                pullState, pullErrorMessage := ar.Pull.getStateWithErrorMessage()
1✔
180
                state = combinedState(ctx, state, pullState)
1✔
181
                if pullErrorMessage != "" {
2✔
182
                        errorMessage = pullErrorMessage
1✔
183
                }
1✔
184
        }
185

186
        return state, errorMessage
1✔
187
}
188

189
func (ar *ActiveReplicator) GetStatus(ctx context.Context) *ReplicationStatus {
1✔
190

1✔
191
        status := &ReplicationStatus{
1✔
192
                ID: ar.ID,
1✔
193
        }
1✔
194
        status.Status, status.ErrorMessage = ar.State(ctx)
1✔
195

1✔
196
        if ar.Pull != nil {
2✔
197
                status.PullReplicationStatus = ar.Pull.GetStatus().PullReplicationStatus
1✔
198
        }
1✔
199

200
        if ar.Push != nil {
2✔
201
                status.PushReplicationStatus = ar.Push.GetStatus().PushReplicationStatus
1✔
202
        }
1✔
203

204
        return status
1✔
205
}
206

207
func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sender, bsc *BlipSyncContext, err error) {
1✔
208
        arc.replicationStats.NumConnectAttempts.Add(1)
1✔
209

1✔
210
        var originPatterns []string // no origin headers for ISGR
1✔
211
        cancelCtx, cancelFunc := context.WithCancel(context.Background())
1✔
212
        blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, cancelCtx)
1✔
213
        if err != nil {
1✔
214
                cancelFunc()
×
215
                return nil, nil, err
×
216
        }
×
217
        blipContext.WebsocketPingInterval = arc.config.WebsocketPingInterval
1✔
218
        blipContext.OnExitCallback = func() {
2✔
219
                // fall into a reconnect loop only if the connection is unexpectedly closed.
1✔
220
                if arc.ctx.Err() == nil {
2✔
221
                        arc.reconnect()
1✔
222
                }
1✔
223
        }
224

225
        bsc, err = NewBlipSyncContext(arc.ctx, blipContext, arc.config.ActiveDB, blipContext.ID, arc.replicationStats, cancelFunc)
1✔
226
        if err != nil {
1✔
227
                return nil, nil, err
×
228
        }
×
229

230
        bsc.loggingCtx = base.CorrelationIDLogCtx(
1✔
231
                arc.config.ActiveDB.AddDatabaseLogContext(base.NewNonCancelCtx().Ctx),
1✔
232
                arc.config.ID+idSuffix)
1✔
233
        if arc.config.RunAs != "" {
2✔
234
                bsc.loggingCtx = base.UserLogCtx(bsc.loggingCtx, arc.config.RunAs, base.UserDomainSyncGateway, nil)
1✔
235
        } else {
2✔
236
                bsc.loggingCtx = arc.config.ActiveDB.AddBucketUserLogContext(bsc.loggingCtx)
1✔
237
        }
1✔
238

239
        // NewBlipSyncContext has already set deltas as disabled/enabled based on config.ActiveDB.
240
        // If deltas have been disabled in the replication config, override this value
241
        if arc.config.DeltasEnabled == false {
2✔
242
                bsc.sgCanUseDeltas = false
1✔
243
        }
1✔
244

245
        blipSender, err = blipSync(*arc.config.RemoteDBURL, blipContext, arc.config.InsecureSkipVerify)
1✔
246
        if err != nil {
2✔
247
                return nil, nil, err
1✔
248
        }
1✔
249

250
        // set active subprotocol after handshake
251
        err = bsc.SetActiveCBMobileSubprotocol(blipContext.ActiveSubprotocol())
1✔
252
        if err != nil {
1✔
253
                return nil, nil, err
×
254
        }
×
255

256
        return blipSender, bsc, nil
1✔
257
}
258

259
// blipSync opens a connection to the target, and returns a blip.Sender to send messages over.
260
func blipSync(target url.URL, blipContext *blip.Context, insecureSkipVerify bool) (*blip.Sender, error) {
1✔
261
        // GET target database endpoint to see if reachable for exit-early/clearer error message
1✔
262
        req, err := http.NewRequest(http.MethodGet, target.String(), nil)
1✔
263
        if err != nil {
1✔
264
                return nil, err
×
265
        }
×
266
        client := base.GetHttpClientForWebSocket(insecureSkipVerify)
1✔
267
        resp, err := client.Do(req)
1✔
268
        if err != nil {
2✔
269
                return nil, err
1✔
270
        }
1✔
271

272
        err = resp.Body.Close()
1✔
273
        if err != nil {
1✔
274
                return nil, err
×
275
        }
×
276

277
        if resp.StatusCode != http.StatusOK {
2✔
278
                return nil, fmt.Errorf("unexpected status code %d from target database", resp.StatusCode)
1✔
279
        }
1✔
280

281
        // switch to websocket protocol scheme
282
        if target.Scheme == "http" {
2✔
283
                target.Scheme = "ws"
1✔
284
        } else if target.Scheme == "https" {
3✔
285
                target.Scheme = "wss"
1✔
286
        }
1✔
287

288
        // Strip userinfo from the URL, don't need it because of the Basic auth header.
289
        var basicAuthCreds *url.Userinfo
1✔
290
        if target.User != nil {
2✔
291
                // take a copy
1✔
292
                if password, hasPassword := target.User.Password(); hasPassword {
2✔
293
                        basicAuthCreds = url.UserPassword(target.User.Username(), password)
1✔
294
                } else {
2✔
295
                        basicAuthCreds = url.User(target.User.Username())
1✔
296
                }
1✔
297
                target.User = nil
1✔
298
        }
299

300
        config := blip.DialOptions{
1✔
301
                URL:        target.String() + "/_blipsync?" + BLIPSyncClientTypeQueryParam + "=" + string(BLIPClientTypeSGR2),
1✔
302
                HTTPClient: client,
1✔
303
        }
1✔
304

1✔
305
        if basicAuthCreds != nil {
2✔
306
                config.HTTPHeader = http.Header{
1✔
307
                        "Authorization": []string{"Basic " + base64UserInfo(basicAuthCreds)},
1✔
308
                }
1✔
309
        }
1✔
310

311
        return blipContext.DialConfig(&config)
1✔
312
}
313

314
// base64UserInfo returns the base64 encoded version of the given UserInfo.
315
// Can't use i.String() here because that returns URL encoded versions of credentials.
316
func base64UserInfo(i *url.Userinfo) string {
1✔
317
        password, _ := i.Password()
1✔
318
        return base64.StdEncoding.EncodeToString([]byte(i.Username() + ":" + password))
1✔
319
}
1✔
320

321
// combinedState reports a combined replication state for a pushAndPull
322
// replication, based on the following criteria:
323
//   - if either replication is in error, return error
324
//   - if either replication is running, return running
325
//   - if both replications are stopped, return stopped
326
func combinedState(ctx context.Context, state1, state2 string) (combinedState string) {
1✔
327
        if state1 == "" {
1✔
328
                return state2
×
329
        }
×
330
        if state2 == "" {
1✔
331
                return state1
×
332
        }
×
333

334
        if state1 == ReplicationStateStopped && state2 == ReplicationStateStopped {
2✔
335
                return ReplicationStateStopped
1✔
336
        }
1✔
337

338
        if state1 == ReplicationStateRunning || state2 == ReplicationStateRunning {
2✔
339
                return ReplicationStateRunning
1✔
340
        }
1✔
341

342
        if state1 == ReplicationStateError || state2 == ReplicationStateError {
2✔
343
                return ReplicationStateError
1✔
344
        }
1✔
345

346
        if state1 == ReplicationStateReconnecting || state2 == ReplicationStateReconnecting {
2✔
347
                return ReplicationStateReconnecting
1✔
348
        }
1✔
349

350
        base.InfofCtx(ctx, base.KeyReplicate, "Unhandled combination of replication states (%s, %s), returning %s", state1, state2, state1)
×
351
        return state1
×
352
}
353

354
func (ar *ActiveReplicator) purgeCheckpoints() {
1✔
355
        if ar.Pull != nil {
2✔
356
                _ = ar.Pull.reset()
1✔
357
        }
1✔
358

359
        if ar.Push != nil {
2✔
360
                _ = ar.Push.reset()
1✔
361
        }
1✔
362
}
363

364
// LoadReplicationStatus attempts to load both push and pull replication checkpoints, and constructs the combined status
365
func LoadReplicationStatus(ctx context.Context, dbContext *DatabaseContext, replicationID string) (status *ReplicationStatus, err error) {
1✔
366

1✔
367
        status = &ReplicationStatus{
1✔
368
                ID: replicationID,
1✔
369
        }
1✔
370

1✔
371
        pullStatusKey := dbContext.MetadataKeys.ReplicationStatusKey(PullCheckpointID(replicationID))
1✔
372
        pullStatus, _ := getLocalStatus(ctx, dbContext.MetadataStore, pullStatusKey)
1✔
373
        if pullStatus != nil {
2✔
374
                status.PullReplicationStatus = pullStatus.PullReplicationStatus
1✔
375
                status.Status = pullStatus.Status
1✔
376
                status.ErrorMessage = pullStatus.ErrorMessage
1✔
377
                status.LastSeqPull = pullStatus.LastSeqPull
1✔
378
        }
1✔
379

380
        pushStatusKey := dbContext.MetadataKeys.ReplicationStatusKey(PushCheckpointID(replicationID))
1✔
381
        pushStatus, _ := getLocalStatus(ctx, dbContext.MetadataStore, pushStatusKey)
1✔
382
        if pushStatus != nil {
2✔
383
                status.PushReplicationStatus = pushStatus.PushReplicationStatus
1✔
384
                status.Status = pushStatus.Status
1✔
385
                status.ErrorMessage = pushStatus.ErrorMessage
1✔
386
                status.LastSeqPush = pushStatus.LastSeqPush
1✔
387
        }
1✔
388

389
        if pullStatus == nil && pushStatus == nil {
2✔
390
                return nil, errors.New("Replication status not found")
1✔
391
        }
1✔
392

393
        return status, nil
1✔
394
}
395

396
func PushCheckpointID(replicationID string) string {
1✔
397
        return "sgr2cp:push:" + replicationID
1✔
398
}
1✔
399

400
func PullCheckpointID(replicationID string) string {
1✔
401
        return "sgr2cp:pull:" + replicationID
1✔
402
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc