• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

couchbase / sync_gateway / 457

13 Jun 2025 04:26PM UTC coverage: 64.244% (+0.04%) from 64.207%
457

push

jenkins

web-flow
Fix Sync Gateway 3.3 index documentation (#7584)

36658 of 57061 relevant lines covered (64.24%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.51
/db/active_replicator_push.go
1
/*
2
Copyright 2020-Present Couchbase, Inc.
3

4
Use of this software is governed by the Business Source License included in
5
the file licenses/BSL-Couchbase.txt.  As of the Change Date specified in that
6
file, in accordance with the Business Source License, use of this software will
7
be governed by the Apache License, Version 2.0, included in the file
8
licenses/APL2.txt.
9
*/
10

11
package db
12

13
import (
14
        "context"
15
        "errors"
16
        "strings"
17
        "sync/atomic"
18
        "time"
19

20
        "github.com/couchbase/go-blip"
21
        "github.com/couchbase/sync_gateway/base"
22
)
23

24
// ActivePushReplicator is a unidirectional push active replicator.
25
type ActivePushReplicator struct {
26
        *activeReplicatorCommon
27
}
28

29
// NewPushReplicator creates an ISGR push replicator.
30
func NewPushReplicator(ctx context.Context, config *ActiveReplicatorConfig) (*ActivePushReplicator, error) {
1✔
31
        replicator, err := newActiveReplicatorCommon(ctx, config, ActiveReplicatorTypePush)
1✔
32
        if err != nil {
2✔
33
                return nil, err
1✔
34
        }
1✔
35
        apr := ActivePushReplicator{
1✔
36
                activeReplicatorCommon: replicator,
1✔
37
        }
1✔
38
        replicator.registerFunctions(apr._getStatus, apr._connect, apr.registerCheckpointerCallbacks)
1✔
39
        return &apr, nil
1✔
40
}
41

42
var PreHydrogenTargetAllowConflictsError = errors.New("cannot run replication to target with allow_conflicts=false. Change to allow_conflicts=true or upgrade to 2.8")
43

44
// _connect opens up a connection, and starts replicating.
45
func (apr *ActivePushReplicator) _connect() error {
1✔
46
        var err error
1✔
47
        apr.blipSender, apr.blipSyncContext, err = connect(apr.activeReplicatorCommon, "-push")
1✔
48
        if err != nil {
2✔
49
                return err
1✔
50
        }
1✔
51

52
        // TODO: If this were made a config option, and the default conflict resolver not enforced on
53
        //         the pull side, it would be feasible to run sgr-2 in 'manual conflict resolution' mode
54
        apr.blipSyncContext.sendRevNoConflicts = true
1✔
55

1✔
56
        if apr.config.CollectionsEnabled {
2✔
57
                if err := apr._startPushWithCollections(); err != nil {
2✔
58
                        return err
1✔
59
                }
1✔
60
        } else {
×
61
                // for backwards compatibility use no collection-specific handling/messages
×
62
                if err := apr._startPushNonCollection(); err != nil {
×
63
                        return err
×
64
                }
×
65
        }
66

67
        apr.setState(ReplicationStateRunning)
1✔
68
        return nil
1✔
69
}
70

71
// Complete gracefully shuts down a replication, waiting for all in-flight revisions to be processed
72
// before stopping the replication
73
func (apr *ActivePushReplicator) Complete() {
1✔
74
        base.TracefCtx(apr.ctx, base.KeyReplicate, "ActivePushReplicator.Complete()")
1✔
75
        apr.lock.Lock()
1✔
76

1✔
77
        // Wait for any pending changes responses to arrive and be processed
1✔
78
        err := apr._waitForPendingChangesResponse()
1✔
79
        if err != nil {
1✔
80
                base.InfofCtx(apr.ctx, base.KeyReplicate, "Timeout waiting for pending changes response for replication %s - stopping: %v", apr.config.ID, err)
×
81
        }
×
82

83
        _ = apr.forEachCollection(func(c *activeReplicatorCollection) error {
2✔
84
                if err := c.Checkpointer.waitForExpectedSequences(); err != nil {
1✔
85
                        base.InfofCtx(apr.ctx, base.KeyReplicate, "Timeout draining replication %s - stopping: %v", apr.config.ID, err)
×
86
                }
×
87
                return nil
1✔
88
        })
89

90
        apr._stop()
1✔
91

1✔
92
        stopErr := apr._disconnect()
1✔
93
        if stopErr != nil {
1✔
94
                base.InfofCtx(apr.ctx, base.KeyReplicate, "Error attempting to stop replication %s: %v", apr.config.ID, stopErr)
×
95
        }
×
96
        apr.setState(ReplicationStateStopped)
1✔
97

1✔
98
        // unlock the replication before triggering callback, in case callback attempts to re-acquire the lock
1✔
99
        onCompleteCallback := apr.onReplicatorComplete
1✔
100
        apr._publishStatus()
1✔
101
        apr.lock.Unlock()
1✔
102

1✔
103
        if onCompleteCallback != nil {
2✔
104
                onCompleteCallback()
1✔
105
        }
1✔
106
}
107

108
// _getStatus returns current replicator status. Requires holding ActivePushReplicator.lock as a read lock.
109
func (apr *ActivePushReplicator) _getStatus() *ReplicationStatus {
1✔
110
        status := &ReplicationStatus{}
1✔
111
        status.Status, status.ErrorMessage = apr.getStateWithErrorMessage()
1✔
112

1✔
113
        pushStats := apr.replicationStats
1✔
114
        status.DocsWritten = pushStats.SendRevCount.Value()
1✔
115
        status.DocsCheckedPush = pushStats.SendChangesCount.Value()
1✔
116
        status.DocWriteFailures = pushStats.SendRevErrorTotal.Value()
1✔
117
        status.DocWriteConflict = pushStats.SendRevErrorConflictCount.Value()
1✔
118
        status.RejectedRemote = pushStats.SendRevErrorRejectedCount.Value()
1✔
119
        status.DeltasSent = pushStats.SendRevDeltaSentCount.Value()
1✔
120
        status.LastSeqPush = apr.getCheckpointHighSeq()
1✔
121
        if apr.initialStatus != nil {
2✔
122
                status.PushReplicationStatus.Add(apr.initialStatus.PushReplicationStatus)
1✔
123
        }
1✔
124
        return status
1✔
125
}
126

127
// registerCheckpointerCallbacks registers appropriate callback functions for checkpointing.
128
func (apr *ActivePushReplicator) registerCheckpointerCallbacks(c *activeReplicatorCollection) error {
1✔
129
        blipSyncContextCollection, err := apr.blipSyncContext.collections.get(c.collectionIdx)
1✔
130
        if err != nil {
1✔
131
                base.WarnfCtx(apr.ctx, "Unable to get blipSyncContextCollection for collection %v", c.collectionIdx)
×
132
                return err
×
133
        }
×
134

135
        blipSyncContextCollection.sgr2PushAlreadyKnownSeqsCallback = c.Checkpointer.AddAlreadyKnownSeq
1✔
136
        blipSyncContextCollection.sgr2PushAddExpectedSeqsCallback = c.Checkpointer.AddExpectedSeqs
1✔
137
        blipSyncContextCollection.sgr2PushProcessedSeqCallback = c.Checkpointer.AddProcessedSeq
1✔
138

1✔
139
        return nil
1✔
140
}
141

142
// waitForExpectedSequences waits for the pending changes response count
143
// to drain to zero.  Intended to be used once the replication has been stopped, to wait for
144
// in-flight changes responses to arrive.
145
// Waits up to 10s, polling every 100ms.
146
func (apr *ActivePushReplicator) _waitForPendingChangesResponse() error {
1✔
147
        waitCount := 0
1✔
148
        for waitCount < 100 {
2✔
149
                if apr.blipSyncContext == nil {
1✔
150
                        return nil
×
151
                }
×
152
                pendingCount := atomic.LoadInt64(&apr.blipSyncContext.changesPendingResponseCount)
1✔
153
                if pendingCount <= 0 {
2✔
154
                        return nil
1✔
155
                }
1✔
156
                time.Sleep(100 * time.Millisecond)
1✔
157
                waitCount++
1✔
158
        }
159
        return errors.New("checkpointer _waitForPendingChangesResponse failed to complete after waiting 10s")
×
160
}
161

162
// Stop stops the push replication and waits for the send changes goroutine to finish.
163
func (apr *ActivePushReplicator) Stop() error {
1✔
164
        if err := apr.stopAndDisconnect(); err != nil {
1✔
165
                return err
×
166
        }
×
167
        teardownStart := time.Now()
1✔
168
        for apr.activeSendChanges.Load() != 0 && (time.Since(teardownStart) < time.Second*10) {
2✔
169
                time.Sleep(10 * time.Millisecond)
1✔
170
        }
1✔
171
        return nil
1✔
172
}
173

174
func (apr *ActivePushReplicator) _startPushNonCollection() error {
×
175
        dbCollection, err := apr.config.ActiveDB.GetDefaultDatabaseCollection()
×
176
        if err != nil {
×
177
                return err
×
178
        }
×
179
        apr.blipSyncContext.collections.setNonCollectionAware(newBlipSyncCollectionContext(apr.ctx, dbCollection))
×
180

×
181
        if err := apr._initCheckpointer(nil); err != nil {
×
182
                // clean up anything we've opened so far
×
183
                base.TracefCtx(apr.ctx, base.KeyReplicate, "Error initialising checkpoint in _connect. Closing everything.")
×
184
                apr.checkpointerCtx = nil
×
185
                apr.blipSender.Close()
×
186
                apr.blipSyncContext.Close()
×
187
                return err
×
188
        }
×
189

190
        dbCollectionWithUser := &DatabaseCollectionWithUser{
×
191
                DatabaseCollection: dbCollection,
×
192
                user:               apr.config.ActiveDB.user,
×
193
        }
×
194
        bh := newBlipHandler(apr.ctx, apr.blipSyncContext, apr.config.ActiveDB, apr.blipSyncContext.incrementSerialNumber())
×
195
        bh.collection = dbCollectionWithUser
×
196
        bh.loggingCtx = bh.collection.AddCollectionContext(bh.BlipSyncContext.loggingCtx)
×
197

×
198
        return apr._startSendingChanges(bh, apr.defaultCollection.Checkpointer.lastCheckpointSeq)
×
199
}
200

201
// _startSendingChanges starts a changes feed for a given collection in a goroutine and starts sending changes to the passive peer from a starting sequence value.
202
func (apr *ActivePushReplicator) _startSendingChanges(bh *blipHandler, since SequenceID) error {
1✔
203
        collectionCtx, err := bh.collections.get(bh.collectionIdx)
1✔
204
        if err != nil {
1✔
205
                return err
×
206
        }
×
207
        var channels base.Set
1✔
208
        if filteredChannels := apr.config.getFilteredChannels(bh.collectionIdx); len(filteredChannels) > 0 {
2✔
209
                channels = base.SetFromArray(filteredChannels)
1✔
210
        }
1✔
211

212
        apr.blipSyncContext.fatalErrorCallback = func(err error) {
1✔
213
                if strings.Contains(err.Error(), ErrUseProposeChanges.Message) {
×
214
                        err = ErrUseProposeChanges
×
215
                        apr.setError(PreHydrogenTargetAllowConflictsError)
×
216
                        err = apr.stopAndDisconnect()
×
217
                        if err != nil {
×
218
                                base.ErrorfCtx(apr.ctx, "Failed to stop and disconnect replication: %v", err)
×
219
                        }
×
220
                } else if strings.Contains(err.Error(), ErrDatabaseWentAway.Message) {
×
221
                        err = apr.disconnect()
×
222
                        if err != nil {
×
223
                                base.ErrorfCtx(apr.ctx, "Failed to disconnect replication after database went away: %v", err)
×
224
                        }
×
225
                }
226
                // No special handling for error
227
        }
228

229
        apr.activeSendChanges.Add(1)
1✔
230
        go func(s *blip.Sender) {
2✔
231
                defer apr.activeSendChanges.Add(-1)
1✔
232
                isComplete, err := bh.sendChanges(s, &sendChangesOptions{
1✔
233
                        docIDs:            apr.config.DocIDs,
1✔
234
                        since:             since,
1✔
235
                        continuous:        apr.config.Continuous,
1✔
236
                        activeOnly:        apr.config.ActiveOnly,
1✔
237
                        batchSize:         int(apr.config.ChangesBatchSize),
1✔
238
                        revocations:       apr.config.PurgeOnRemoval,
1✔
239
                        channels:          channels,
1✔
240
                        clientType:        clientTypeSGR2,
1✔
241
                        ignoreNoConflicts: true, // force the passive side to accept a "changes" message, even in no conflicts mode.
1✔
242
                        changesCtx:        collectionCtx.changesCtx,
1✔
243
                })
1✔
244
                if err != nil {
2✔
245
                        base.InfofCtx(apr.ctx, base.KeyReplicate, "Terminating blip connection due to changes feed error: %v", err)
1✔
246
                        bh.ctxCancelFunc()
1✔
247
                        apr.setError(err)
1✔
248
                        apr.publishStatus()
1✔
249
                        return
1✔
250
                }
1✔
251
                if isComplete {
2✔
252
                        // On a normal completion, call complete for the replication
1✔
253
                        apr.Complete()
1✔
254
                }
1✔
255
        }(apr.blipSender)
256
        return nil
1✔
257
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc