• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

couchbase / sync_gateway / 433

05 Jun 2025 03:32PM UTC coverage: 64.334% (+0.03%) from 64.302%
433

push

jenkins

web-flow
Docs/API: Fix outdated 'keyspace_map' ISGR collections config reference (#7561)

36770 of 57155 relevant lines covered (64.33%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.83
/db/blip_handler.go
1
/*
2
Copyright 2020-Present Couchbase, Inc.
3

4
Use of this software is governed by the Business Source License included in
5
the file licenses/BSL-Couchbase.txt.  As of the Change Date specified in that
6
file, in accordance with the Business Source License, use of this software will
7
be governed by the Apache License, Version 2.0, included in the file
8
licenses/APL2.txt.
9
*/
10

11
package db
12

13
import (
14
        "bytes"
15
        "context"
16
        "encoding/json"
17
        "fmt"
18
        "net/http"
19
        "runtime/debug"
20
        "strconv"
21
        "strings"
22
        "sync/atomic"
23
        "time"
24

25
        "github.com/couchbase/go-blip"
26
        sgbucket "github.com/couchbase/sg-bucket"
27
        "github.com/couchbase/sync_gateway/base"
28
        "github.com/couchbase/sync_gateway/channels"
29
)
30

31
// handlersByProfile defines the routes for each message profile (verb) of an incoming request to the function that handles it.
32
var handlersByProfile = map[string]blipHandlerFunc{
33
        MessageGetCheckpoint:   collectionBlipHandler((*blipHandler).handleGetCheckpoint),
34
        MessageSetCheckpoint:   collectionBlipHandler((*blipHandler).handleSetCheckpoint),
35
        MessageSubChanges:      userBlipHandler(collectionBlipHandler((*blipHandler).handleSubChanges)),
36
        MessageUnsubChanges:    userBlipHandler(collectionBlipHandler((*blipHandler).handleUnsubChanges)),
37
        MessageChanges:         userBlipHandler(collectionBlipHandler((*blipHandler).handleChanges)),
38
        MessageRev:             userBlipHandler(collectionBlipHandler((*blipHandler).handleRev)),
39
        MessageNoRev:           collectionBlipHandler((*blipHandler).handleNoRev),
40
        MessageGetAttachment:   userBlipHandler(collectionBlipHandler((*blipHandler).handleGetAttachment)),
41
        MessageProveAttachment: userBlipHandler(collectionBlipHandler((*blipHandler).handleProveAttachment)),
42
        MessageProposeChanges:  collectionBlipHandler((*blipHandler).handleProposeChanges),
43
        MessageGetRev:          userBlipHandler(collectionBlipHandler((*blipHandler).handleGetRev)),
44
        MessagePutRev:          userBlipHandler(collectionBlipHandler((*blipHandler).handlePutRev)),
45

46
        MessageGetCollections: userBlipHandler((*blipHandler).handleGetCollections),
47
}
48

49
var kConnectedClientHandlersByProfile = map[string]blipHandlerFunc{
50
        MessageFunction: userBlipHandler((*blipHandler).handleFunction),
51
}
52

53
// Replication throttling
54
const (
55
        // DefaultMaxConcurrentChangesBatches is the maximum number of in-flight changes batches a client is allowed to send concurrently without being throttled.
56
        DefaultMaxConcurrentChangesBatches = 2
57
        // DefaultMaxConcurrentRevs is the maximum number of in-flight revisions a client is allowed to send or receive concurrently without being throttled.
58
        DefaultMaxConcurrentRevs = 5
59
)
60

61
type blipHandler struct {
62
        *BlipSyncContext
63
        db            *Database                   // Handler-specific copy of the BlipSyncContext's blipContextDb
64
        collection    *DatabaseCollectionWithUser // Handler-specific copy of the BlipSyncContext's collection specific DB
65
        collectionCtx *blipSyncCollectionContext  // Sync-specific data for this collection
66
        collectionIdx *int                        // index into BlipSyncContext.collectionMapping for the collection
67
        loggingCtx    context.Context             // inherited from BlipSyncContext.loggingCtx with additional handler-only information (like keyspace)
68
        serialNumber  uint64                      // This blip handler's serial number to differentiate logs w/ other handlers
69
}
70

71
func newBlipHandler(ctx context.Context, bc *BlipSyncContext, db *Database, serialNumber uint64) *blipHandler {
1✔
72
        return &blipHandler{
1✔
73
                BlipSyncContext: bc,
1✔
74
                db:              db,
1✔
75
                loggingCtx:      ctx,
1✔
76
                serialNumber:    serialNumber,
1✔
77
        }
1✔
78
}
1✔
79

80
// BlipSyncContextClientType represents whether to replicate to another Sync Gateway or Couchbase Lite
81
type BLIPSyncContextClientType string
82

83
const (
84
        BLIPCorrelationIDResponseHeader = "X-Correlation-ID"
85

86
        BLIPSyncClientTypeQueryParam = "client"
87

88
        BLIPClientTypeCBL2 BLIPSyncContextClientType = "cbl2"
89
        BLIPClientTypeSGR2 BLIPSyncContextClientType = "sgr2"
90
)
91

92
type blipHandlerFunc func(*blipHandler, *blip.Message) error
93

94
var (
95
        // CBLReconnectErrorCode is the error code that CBL will use to trigger a reconnect
96
        CBLReconnectErrorCode = http.StatusServiceUnavailable
97

98
        ErrUseProposeChanges = base.HTTPErrorf(http.StatusConflict, "Use 'proposeChanges' instead")
99

100
        // ErrDatabaseWentAway is returned when a replication tries to use a closed database.
101
        // HTTP 503 tells the client to reconnect and try again.
102
        ErrDatabaseWentAway = base.HTTPErrorf(http.StatusServiceUnavailable, "Sync Gateway database went away - asking client to reconnect")
103

104
        // ErrAttachmentNotFound is returned when the attachment that is asked by one of the peers does
105
        // not exist in another to prove that it has the attachment during Inter-Sync Gateway Replication.
106
        ErrAttachmentNotFound = base.HTTPErrorf(http.StatusNotFound, "attachment not found")
107
)
108

109
// userBlipHandler wraps another blip handler with code that reloads the user object when the user
110
// or the user's roles have changed, to make sure that the replication has the latest channel access grants.
111
// Uses a userChangeWaiter to detect changes to the user or roles.  Note that in the case of a pushed document
112
// triggering a user access change, this happens at write time (via MarkPrincipalsChanged), and doesn't
113
// depend on the userChangeWaiter.
114
func userBlipHandler(next blipHandlerFunc) blipHandlerFunc {
1✔
115
        return func(bh *blipHandler, bm *blip.Message) error {
2✔
116

1✔
117
                // Reload user if it has changed
1✔
118
                if err := bh.refreshUser(); err != nil {
1✔
119
                        return err
×
120
                }
×
121
                // Call down to the underlying handler and return it's value
122
                return next(bh, bm)
1✔
123
        }
124
}
125

126
func (bh *blipHandler) refreshUser() error {
1✔
127

1✔
128
        bc := bh.BlipSyncContext
1✔
129
        if bc.userName != "" {
2✔
130
                // Check whether user needs to be refreshed
1✔
131
                bc.dbUserLock.Lock()
1✔
132
                defer bc.dbUserLock.Unlock()
1✔
133
                userChanged := bc.userChangeWaiter.RefreshUserCount()
1✔
134

1✔
135
                // If changed, refresh the user and db while holding the lock
1✔
136
                if userChanged {
2✔
137
                        // Refresh the BlipSyncContext database
1✔
138
                        err := bc.blipContextDb.ReloadUser(bh.loggingCtx)
1✔
139
                        if err != nil {
1✔
140
                                return base.NewHTTPError(CBLReconnectErrorCode, err.Error())
×
141
                        }
×
142
                        newUser := bc.blipContextDb.User()
1✔
143
                        newUser.InitializeRoles()
1✔
144
                        bc.userChangeWaiter.RefreshUserKeys(newUser, bc.blipContextDb.MetadataKeys)
1✔
145
                        // refresh the handler's database with the new BlipSyncContext database
1✔
146
                        bh.db = bh._copyContextDatabase()
1✔
147
                        if bh.collection != nil {
2✔
148
                                bh.collection = &DatabaseCollectionWithUser{
1✔
149
                                        DatabaseCollection: bh.collection.DatabaseCollection,
1✔
150
                                        user:               bh.db.User(),
1✔
151
                                }
1✔
152
                        }
1✔
153
                }
154
        }
155
        return nil
1✔
156
}
157

158
// collectionBlipHandler wraps another blip handler to specify a collection
159
func collectionBlipHandler(next blipHandlerFunc) blipHandlerFunc {
1✔
160
        return func(bh *blipHandler, bm *blip.Message) error {
2✔
161
                collectionIndexStr, ok := bm.Properties[BlipCollection]
1✔
162
                if !ok {
2✔
163
                        if !bh.db.HasDefaultCollection() {
2✔
164
                                return base.HTTPErrorf(http.StatusBadRequest, "Collection property not specified and default collection is not configured for this database")
1✔
165
                        }
1✔
166
                        if bh.collections.hasNamedCollections() {
1✔
167
                                return base.HTTPErrorf(http.StatusBadRequest, "GetCollections already occurred, subsequent messages need a Collection property")
×
168
                        }
×
169
                        var err error
1✔
170
                        bh.collection, err = bh.db.GetDefaultDatabaseCollectionWithUser()
1✔
171
                        if err != nil {
1✔
172
                                return err
×
173
                        }
×
174
                        bh.collectionCtx, err = bh.collections.get(nil)
1✔
175
                        if err != nil {
2✔
176
                                bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.loggingCtx, bh.collection.DatabaseCollection))
1✔
177
                                bh.collectionCtx, _ = bh.collections.get(nil)
1✔
178
                        }
1✔
179
                        bh.loggingCtx = bh.collection.AddCollectionContext(bh.BlipSyncContext.loggingCtx)
1✔
180
                        return next(bh, bm)
1✔
181
                }
182
                if !bh.collections.hasNamedCollections() {
2✔
183
                        return base.HTTPErrorf(http.StatusBadRequest, "Passing collection requires calling %s first", MessageGetCollections)
1✔
184
                }
1✔
185

186
                collectionIndex, err := strconv.Atoi(collectionIndexStr)
1✔
187
                if err != nil {
1✔
188
                        return base.HTTPErrorf(http.StatusBadRequest, "collection property needs to be an int, was %q", collectionIndexStr)
×
189
                }
×
190

191
                bh.collectionIdx = &collectionIndex
1✔
192
                bh.collectionCtx, err = bh.collections.get(&collectionIndex)
1✔
193
                if err != nil {
2✔
194
                        return base.HTTPErrorf(http.StatusBadRequest, "%s", err)
1✔
195
                }
1✔
196
                bh.collection = &DatabaseCollectionWithUser{
1✔
197
                        DatabaseCollection: bh.collectionCtx.dbCollection,
1✔
198
                        user:               bh.db.user,
1✔
199
                }
1✔
200
                bh.loggingCtx = bh.collection.AddCollectionContext(bh.BlipSyncContext.loggingCtx)
1✔
201
                // Call down to the underlying handler and return it's value
1✔
202
                return next(bh, bm)
1✔
203
        }
204
}
205

206
// ////// CHECKPOINTS
207

208
// Received a "getCheckpoint" request
209
func (bh *blipHandler) handleGetCheckpoint(rq *blip.Message) error {
1✔
210

1✔
211
        client := rq.Properties[BlipClient]
1✔
212
        bh.logEndpointEntry(rq.Profile(), fmt.Sprintf("Client:%s", client))
1✔
213

1✔
214
        response := rq.Response()
1✔
215
        if response == nil {
1✔
216
                return nil
×
217
        }
×
218

219
        value, err := bh.collection.GetSpecial(DocTypeLocal, CheckpointDocIDPrefix+client)
1✔
220
        if err != nil {
2✔
221
                return err
1✔
222
        }
1✔
223
        if value == nil {
1✔
224
                return base.NewHTTPError(http.StatusNotFound, http.StatusText(http.StatusNotFound))
×
225
        }
×
226
        response.Properties[GetCheckpointResponseRev] = value[BodyRev].(string)
1✔
227
        delete(value, BodyRev)
1✔
228
        delete(value, BodyId)
1✔
229
        // TODO: Marshaling here when we could use raw bytes all the way from the bucket
1✔
230
        _ = response.SetJSONBody(value)
1✔
231
        return nil
1✔
232
}
233

234
// Received a "setCheckpoint" request
235
func (bh *blipHandler) handleSetCheckpoint(rq *blip.Message) error {
1✔
236

1✔
237
        checkpointMessage := SetCheckpointMessage{rq}
1✔
238
        bh.logEndpointEntry(rq.Profile(), checkpointMessage.String())
1✔
239

1✔
240
        var checkpoint Body
1✔
241
        if err := checkpointMessage.ReadJSONBody(&checkpoint); err != nil {
1✔
242
                return err
×
243
        }
×
244
        if revID := checkpointMessage.rev(); revID != "" {
2✔
245
                checkpoint[BodyRev] = revID
1✔
246
        }
1✔
247
        revID, _, err := bh.collection.PutSpecial(DocTypeLocal, CheckpointDocIDPrefix+checkpointMessage.client(), checkpoint)
1✔
248
        if err != nil {
2✔
249
                return err
1✔
250
        }
1✔
251

252
        checkpointResponse := SetCheckpointResponse{checkpointMessage.Response()}
1✔
253
        checkpointResponse.setRev(revID)
1✔
254

1✔
255
        return nil
1✔
256
}
257

258
// ////// CHANGES
259

260
// Received a "subChanges" subscription request
261
func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
1✔
262
        latestSeq := func() (SequenceID, error) {
1✔
263
                seq, err := bh.collection.LastSequence(bh.loggingCtx)
×
264
                return SequenceID{Seq: seq}, err
×
265
        }
×
266
        subChangesParams, err := NewSubChangesParams(bh.loggingCtx, rq, latestSeq, bh.db.Options.ChangesRequestPlus)
1✔
267
        if err != nil {
1✔
268
                return base.HTTPErrorf(http.StatusBadRequest, "Invalid subChanges parameters")
×
269
        }
×
270

271
        // Ensure that only _one_ subChanges subscription can be open on this blip connection at any given time.  SG #3222.
272
        collectionCtx := bh.collectionCtx
1✔
273
        collectionCtx.changesCtxLock.Lock()
1✔
274
        defer collectionCtx.changesCtxLock.Unlock()
1✔
275
        if !collectionCtx.activeSubChanges.CASRetry(false, true) {
2✔
276
                collectionStr := "default collection"
1✔
277
                if bh.collectionIdx != nil {
1✔
278
                        collectionStr = fmt.Sprintf("collection %d", *bh.collectionIdx)
×
279
                }
×
280
                return fmt.Errorf("blipHandler for %s already has an outstanding subChanges. Cannot open another one", collectionStr)
1✔
281
        }
282

283
        // Create ctx if it has been cancelled
284
        if collectionCtx.changesCtx.Err() != nil {
2✔
285
                collectionCtx.changesCtx, collectionCtx.changesCtxCancel = context.WithCancel(bh.loggingCtx)
1✔
286
        }
1✔
287

288
        if len(subChangesParams.docIDs()) > 0 && subChangesParams.continuous() {
1✔
289
                return base.HTTPErrorf(http.StatusBadRequest, "DocIDs filter not supported for continuous subChanges")
×
290
        }
×
291

292
        bh.logEndpointEntry(rq.Profile(), subChangesParams.String())
1✔
293

1✔
294
        var channels base.Set
1✔
295
        if filter := subChangesParams.filter(); filter == base.ByChannelFilter {
2✔
296
                var err error
1✔
297

1✔
298
                channels, err = subChangesParams.channelsExpandedSet()
1✔
299
                if err != nil {
1✔
300
                        return base.HTTPErrorf(http.StatusBadRequest, "%s", err)
×
301
                } else if len(channels) == 0 {
1✔
302
                        return base.HTTPErrorf(http.StatusBadRequest, "Empty channel list")
×
303
                }
×
304
        } else if filter != "" {
1✔
305
                return base.HTTPErrorf(http.StatusBadRequest, "Unknown filter; try sync_gateway/bychannel")
×
306
        }
×
307

308
        collectionCtx.channels = channels
1✔
309

1✔
310
        clientType := clientTypeCBL2
1✔
311
        if rq.Properties["client_sgr2"] == trueProperty {
2✔
312
                clientType = clientTypeSGR2
1✔
313
        }
1✔
314

315
        continuous := subChangesParams.continuous()
1✔
316

1✔
317
        requestPlusSeq := uint64(0)
1✔
318
        // If non-continuous, check whether requestPlus handling is set for request or via database config
1✔
319
        if continuous == false {
2✔
320
                useRequestPlus := subChangesParams.requestPlus()
1✔
321
                if useRequestPlus {
2✔
322
                        seq, requestPlusErr := bh.db.GetRequestPlusSequence()
1✔
323
                        if requestPlusErr != nil {
1✔
324
                                return base.HTTPErrorf(http.StatusServiceUnavailable, "Unable to retrieve current sequence for requestPlus=true: %v", requestPlusErr)
×
325
                        }
×
326
                        requestPlusSeq = seq
1✔
327
                }
328
        }
329

330
        bh.collectionCtx.sendReplacementRevs = subChangesParams.sendReplacementRevs()
1✔
331

1✔
332
        // Start asynchronous changes goroutine
1✔
333
        go func() {
2✔
334
                // Pull replication stats by type
1✔
335
                if continuous {
2✔
336
                        bh.replicationStats.SubChangesContinuousActive.Add(1)
1✔
337
                        defer bh.replicationStats.SubChangesContinuousActive.Add(-1)
1✔
338
                        bh.replicationStats.SubChangesContinuousTotal.Add(1)
1✔
339
                } else {
2✔
340
                        bh.replicationStats.SubChangesOneShotActive.Add(1)
1✔
341
                        defer bh.replicationStats.SubChangesOneShotActive.Add(-1)
1✔
342
                        bh.replicationStats.SubChangesOneShotTotal.Add(1)
1✔
343
                }
1✔
344

345
                defer func() {
2✔
346
                        collectionCtx.changesCtxCancel()
1✔
347
                        collectionCtx.activeSubChanges.Set(false)
1✔
348
                }()
1✔
349
                // sendChanges runs until blip context closes, or fails due to error
350
                startTime := time.Now()
1✔
351
                _, err = bh.sendChanges(rq.Sender, &sendChangesOptions{
1✔
352
                        docIDs:            subChangesParams.docIDs(),
1✔
353
                        since:             subChangesParams.Since(),
1✔
354
                        continuous:        continuous,
1✔
355
                        activeOnly:        subChangesParams.activeOnly(),
1✔
356
                        batchSize:         subChangesParams.batchSize(),
1✔
357
                        channels:          collectionCtx.channels,
1✔
358
                        revocations:       subChangesParams.revocations(),
1✔
359
                        clientType:        clientType,
1✔
360
                        ignoreNoConflicts: clientType == clientTypeSGR2, // force this side to accept a "changes" message, even in no conflicts mode for SGR2.
1✔
361
                        changesCtx:        collectionCtx.changesCtx,
1✔
362
                        requestPlusSeq:    requestPlusSeq,
1✔
363
                })
1✔
364
                if err != nil {
2✔
365
                        base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "Closing blip connection due to changes feed error %+v\n", err)
1✔
366
                        bh.ctxCancelFunc()
1✔
367
                }
1✔
368
                base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s   --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime))
1✔
369
        }()
370

371
        auditFields := base.AuditFields{
1✔
372
                base.AuditFieldSince: subChangesParams.Since().String(),
1✔
373
        }
1✔
374
        if subChangesParams.filter() != "" {
2✔
375
                auditFields[base.AuditFieldFilter] = subChangesParams.filter()
1✔
376
        }
1✔
377
        if len(subChangesParams.docIDs()) > 0 {
2✔
378
                auditFields[base.AuditFieldDocIDs] = subChangesParams.docIDs()
1✔
379
                auditFields[base.AuditFieldFilter] = base.DocIDsFilter
1✔
380
        }
1✔
381
        if continuous {
2✔
382
                auditFields[base.AuditFieldFeedType] = "continuous"
1✔
383
        } else {
2✔
384
                auditFields[base.AuditFieldFeedType] = "normal"
1✔
385
        }
1✔
386
        if len(channels) > 0 {
2✔
387
                auditFields[base.AuditFieldChannels] = channels
1✔
388
        }
1✔
389
        base.Audit(bh.loggingCtx, base.AuditIDChangesFeedStarted, auditFields)
1✔
390
        return nil
1✔
391
}
392

393
func (bh *blipHandler) handleUnsubChanges(rq *blip.Message) error {
1✔
394
        collectionCtx := bh.collectionCtx
1✔
395
        collectionCtx.changesCtxLock.Lock()
1✔
396
        defer collectionCtx.changesCtxLock.Unlock()
1✔
397
        collectionCtx.changesCtxCancel()
1✔
398
        return nil
1✔
399
}
1✔
400

401
type clientType uint8
402

403
const (
404
        clientTypeCBL2 clientType = iota
405
        clientTypeSGR2
406
)
407

408
type sendChangesOptions struct {
409
        docIDs            []string
410
        since             SequenceID
411
        continuous        bool
412
        activeOnly        bool
413
        batchSize         int
414
        channels          base.Set
415
        clientType        clientType
416
        revocations       bool
417
        ignoreNoConflicts bool
418
        changesCtx        context.Context
419
        requestPlusSeq    uint64
420
}
421

422
type changesDeletedFlag uint
423

424
const (
425
        // Bitfield flags used to build changes deleted property below
426
        changesDeletedFlagDeleted changesDeletedFlag = 0b001
427
        changesDeletedFlagRevoked changesDeletedFlag = 0b010
428
        changesDeletedFlagRemoved changesDeletedFlag = 0b100
429
)
430

431
func (flag changesDeletedFlag) HasFlag(deletedFlag changesDeletedFlag) bool {
1✔
432
        return flag&deletedFlag != 0
1✔
433
}
1✔
434

435
// sendChanges will start a changes feed and send changes. Returns bool to indicate whether the changes feed finished and all changes were sent. The error value is only used to indicate a fatal error, where the blip connection should be terminated. If the blip connection is disconnected by the client, the error will be nil, but the boolean parameter will be false.
436
func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions) (bool, error) {
1✔
437
        defer func() {
2✔
438
                if panicked := recover(); panicked != nil {
1✔
439
                        bh.replicationStats.NumHandlersPanicked.Add(1)
×
440
                        base.WarnfCtx(bh.loggingCtx, "[%s] PANIC sending changes: %v\n%s", bh.blipContext.ID, panicked, debug.Stack())
×
441
                }
×
442
        }()
443

444
        base.InfofCtx(bh.loggingCtx, base.KeySync, "Sending changes since %v", opts.since)
1✔
445

1✔
446
        options := ChangesOptions{
1✔
447
                Since:          opts.since,
1✔
448
                Conflicts:      false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437)
1✔
449
                Continuous:     opts.continuous,
1✔
450
                ActiveOnly:     opts.activeOnly,
1✔
451
                Revocations:    opts.revocations,
1✔
452
                clientType:     opts.clientType,
1✔
453
                ChangesCtx:     opts.changesCtx,
1✔
454
                RequestPlusSeq: opts.requestPlusSeq,
1✔
455
        }
1✔
456

1✔
457
        channelSet := opts.channels
1✔
458
        if channelSet == nil {
2✔
459
                channelSet = base.SetOf(channels.AllChannelWildcard)
1✔
460
        }
1✔
461

462
        caughtUp := false
1✔
463
        pendingChanges := make([][]interface{}, 0, opts.batchSize)
1✔
464
        sendPendingChangesAt := func(minChanges int) error {
2✔
465
                if len(pendingChanges) >= minChanges {
2✔
466
                        if err := bh.sendBatchOfChanges(sender, pendingChanges, opts.ignoreNoConflicts); err != nil {
2✔
467
                                return err
1✔
468
                        }
1✔
469
                        pendingChanges = make([][]interface{}, 0, opts.batchSize)
1✔
470
                }
471
                return nil
1✔
472
        }
473

474
        // Create a distinct database instance for changes, to avoid races between reloadUser invocation in changes.go
475
        // and BlipSyncContext user access.
476
        changesDb, err := bh.copyDatabaseCollectionWithUser(bh.collectionIdx)
1✔
477
        if err != nil {
1✔
478
                base.WarnfCtx(bh.loggingCtx, "[%s] error sending changes: %v", bh.blipContext.ID, err)
×
479
                return false, err
×
480
        }
×
481

482
        forceClose, err := generateBlipSyncChanges(bh.loggingCtx, changesDb, channelSet, options, opts.docIDs, func(changes []*ChangeEntry) error {
2✔
483
                base.DebugfCtx(bh.loggingCtx, base.KeySync, "    Sending %d changes", len(changes))
1✔
484
                for _, change := range changes {
2✔
485
                        if !strings.HasPrefix(change.ID, "_") {
2✔
486
                                // If change is a removal and we're running with protocol V3 and change change is not a tombstone
1✔
487
                                // fall into 3.0 removal handling.
1✔
488
                                // Changes with change.Revoked=true have already evaluated UserHasDocAccess in changes.go, don't check again.
1✔
489
                                if change.allRemoved && bh.activeCBMobileSubprotocol >= CBMobileReplicationV3 && !change.Deleted && !change.Revoked && !bh.db.Options.UnsupportedOptions.BlipSendDocsWithChannelRemoval {
2✔
490
                                        // If client doesn't want removals / revocations, don't send change
1✔
491
                                        if !opts.revocations {
2✔
492
                                                continue
1✔
493
                                        }
494

495
                                        // If the user has access to the doc through another channel don't send change
496
                                        userHasAccessToDoc, err := UserHasDocAccess(bh.loggingCtx, changesDb, change.ID)
1✔
497
                                        if err == nil && userHasAccessToDoc {
2✔
498
                                                continue
1✔
499
                                        }
500
                                        // If we can't determine user access due to an error, log error and fall through to send change anyway.
501
                                        // In the event of an error we should be cautious and send a revocation anyway, even if the user
502
                                        // may actually have an alternate access method. This is the safer approach security-wise and
503
                                        // also allows for a recovery if the user notices they are missing a doc they should have access
504
                                        // to. A recovery option would be to trigger a mutation of the document for it to be sent in a
505
                                        // subsequent changes request. If we were to avoid sending a removal there is no recovery
506
                                        // option to then trigger that removal later on.
507
                                        if err != nil {
1✔
508
                                                base.WarnfCtx(bh.loggingCtx, "Unable to determine whether user has access to %s, will send removal: %v", base.UD(change.ID), err)
×
509
                                        }
×
510

511
                                }
512
                                for _, item := range change.Changes {
2✔
513
                                        changeRow := bh.buildChangesRow(change, item["rev"])
1✔
514
                                        pendingChanges = append(pendingChanges, changeRow)
1✔
515
                                        if err := sendPendingChangesAt(opts.batchSize); err != nil {
2✔
516
                                                return err
1✔
517
                                        }
1✔
518
                                }
519
                        }
520
                }
521
                if caughtUp || len(changes) == 0 {
2✔
522
                        if err := sendPendingChangesAt(1); err != nil {
2✔
523
                                return err
1✔
524
                        }
1✔
525
                        if !caughtUp {
2✔
526
                                caughtUp = true
1✔
527
                                // Signal to client that it's caught up
1✔
528
                                if err := bh.sendBatchOfChanges(sender, nil, opts.ignoreNoConflicts); err != nil {
1✔
529
                                        return err
×
530
                                }
×
531
                        }
532
                }
533
                return nil
1✔
534
        })
535

536
        // On forceClose, send notify to trigger immediate exit from change waiter
537
        if forceClose {
2✔
538
                user := ""
1✔
539
                if bh.db.User() != nil {
2✔
540
                        user = bh.db.User().Name()
1✔
541
                }
1✔
542
                bh.db.DatabaseContext.NotifyTerminatedChanges(bh.loggingCtx, user)
1✔
543
        }
544
        return (err == nil && !forceClose), err
1✔
545
}
546

547
func (bh *blipHandler) buildChangesRow(change *ChangeEntry, revID string) []interface{} {
1✔
548
        var changeRow []interface{}
1✔
549

1✔
550
        if bh.activeCBMobileSubprotocol >= CBMobileReplicationV3 {
2✔
551
                deletedFlags := changesDeletedFlag(0)
1✔
552
                if change.Deleted {
2✔
553
                        deletedFlags |= changesDeletedFlagDeleted
1✔
554
                }
1✔
555
                if change.Revoked {
2✔
556
                        deletedFlags |= changesDeletedFlagRevoked
1✔
557
                }
1✔
558
                if change.allRemoved {
2✔
559
                        deletedFlags |= changesDeletedFlagRemoved
1✔
560
                }
1✔
561

562
                changeRow = []interface{}{change.Seq, change.ID, revID, deletedFlags}
1✔
563
                if deletedFlags == 0 {
2✔
564
                        changeRow = changeRow[0:3]
1✔
565
                }
1✔
566

567
        } else {
1✔
568
                changeRow = []interface{}{change.Seq, change.ID, revID, change.Deleted}
1✔
569
                if !change.Deleted {
2✔
570
                        changeRow = changeRow[0:3]
1✔
571
                }
1✔
572
        }
573

574
        return changeRow
1✔
575
}
576

577
func (bh *blipHandler) sendBatchOfChanges(sender *blip.Sender, changeArray [][]interface{}, ignoreNoConflicts bool) error {
1✔
578
        outrq := blip.NewRequest()
1✔
579
        outrq.SetProfile("changes")
1✔
580
        if ignoreNoConflicts {
2✔
581
                outrq.Properties[ChangesMessageIgnoreNoConflicts] = trueProperty
1✔
582
        }
1✔
583
        if bh.collectionIdx != nil {
2✔
584
                outrq.Properties[BlipCollection] = strconv.Itoa(*bh.collectionIdx)
1✔
585
        }
1✔
586
        err := outrq.SetJSONBody(changeArray)
1✔
587
        if err != nil {
1✔
588
                base.InfofCtx(bh.loggingCtx, base.KeyAll, "Error setting changes: %v", err)
×
589
        }
×
590

591
        if len(changeArray) > 0 {
2✔
592
                // Check for user updates before creating the db copy for handleChangesResponse
1✔
593
                if err := bh.refreshUser(); err != nil {
1✔
594
                        return err
×
595
                }
×
596

597
                handleChangesResponseDbCollection, err := bh.copyDatabaseCollectionWithUser(bh.collectionIdx)
1✔
598
                if err != nil {
1✔
599
                        return err
×
600
                }
×
601

602
                sendTime := time.Now()
1✔
603
                if !bh.sendBLIPMessage(sender, outrq) {
2✔
604
                        return ErrClosedBLIPSender
1✔
605
                }
1✔
606

607
                bh.inFlightChangesThrottle <- struct{}{}
1✔
608
                atomic.AddInt64(&bh.changesPendingResponseCount, 1)
1✔
609

1✔
610
                bh.replicationStats.SendChangesCount.Add(int64(len(changeArray)))
1✔
611
                // Spawn a goroutine to await the client's response:
1✔
612
                go func(bh *blipHandler, sender *blip.Sender, response *blip.Message, changeArray [][]interface{}, sendTime time.Time, dbCollection *DatabaseCollectionWithUser) {
2✔
613
                        if err := bh.handleChangesResponse(bh.loggingCtx, sender, response, changeArray, sendTime, dbCollection, bh.collectionIdx); err != nil {
2✔
614
                                base.WarnfCtx(bh.loggingCtx, "Error from bh.handleChangesResponse: %v", err)
1✔
615
                                if bh.fatalErrorCallback != nil {
1✔
616
                                        bh.fatalErrorCallback(err)
×
617
                                }
×
618
                        }
619

620
                        // Sent all of the revs for this changes batch, allow another changes batch to be sent.
621
                        select {
1✔
622
                        case <-bh.inFlightChangesThrottle:
1✔
623
                        case <-bh.terminator:
×
624
                        }
625

626
                        atomic.AddInt64(&bh.changesPendingResponseCount, -1)
1✔
627
                }(bh, sender, outrq.Response(), changeArray, sendTime, handleChangesResponseDbCollection)
628
        } else {
1✔
629
                outrq.SetNoReply(true)
1✔
630
                if !bh.sendBLIPMessage(sender, outrq) {
1✔
631
                        return ErrClosedBLIPSender
×
632
                }
×
633
        }
634

635
        if len(changeArray) > 0 {
2✔
636
                sequence := changeArray[0][0].(SequenceID)
1✔
637
                base.InfofCtx(bh.loggingCtx, base.KeySync, "Sent %d changes to client, from seq %s", len(changeArray), sequence.String())
1✔
638
        } else {
2✔
639
                base.InfofCtx(bh.loggingCtx, base.KeySync, "Sent all changes to client")
1✔
640
        }
1✔
641

642
        return nil
1✔
643
}
644

645
// Handles a "changes" request, i.e. a set of changes pushed by the client
646
func (bh *blipHandler) handleChanges(rq *blip.Message) error {
1✔
647
        var ignoreNoConflicts bool
1✔
648
        if val := rq.Properties[ChangesMessageIgnoreNoConflicts]; val != "" {
2✔
649
                ignoreNoConflicts = val == trueProperty
1✔
650
        }
1✔
651

652
        if !ignoreNoConflicts && !bh.collection.AllowConflicts() {
1✔
653
                return ErrUseProposeChanges
×
654
        }
×
655

656
        var changeList [][]interface{}
1✔
657
        if err := rq.ReadJSONBody(&changeList); err != nil {
1✔
658
                base.WarnfCtx(bh.loggingCtx, "Handle changes got error: %v", err)
×
659
                return err
×
660
        }
×
661

662
        collectionCtx := bh.collectionCtx
1✔
663
        bh.logEndpointEntry(rq.Profile(), fmt.Sprintf("#Changes:%d", len(changeList)))
1✔
664
        if len(changeList) == 0 {
2✔
665
                // An empty changeList is sent when a one-shot replication sends its final changes
1✔
666
                // message, or a continuous replication catches up *for the first time*.
1✔
667
                // Note that this doesn't mean that rev messages associated with previous changes
1✔
668
                // messages have been fully processed
1✔
669
                if collectionCtx.emptyChangesMessageCallback != nil {
2✔
670
                        collectionCtx.emptyChangesMessageCallback()
1✔
671
                }
1✔
672
                return nil
1✔
673
        }
674
        output := bytes.NewBuffer(make([]byte, 0, 100*len(changeList)))
1✔
675
        output.Write([]byte("["))
1✔
676
        jsonOutput := base.JSONEncoder(output)
1✔
677
        nWritten := 0
1✔
678
        nRequested := 0
1✔
679

1✔
680
        // Include changes messages w/ proposeChanges stats, although CBL should only be using proposeChanges
1✔
681
        startTime := time.Now()
1✔
682
        bh.replicationStats.HandleChangesCount.Add(int64(len(changeList)))
1✔
683
        defer func() {
2✔
684
                bh.replicationStats.HandleChangesTime.Add(time.Since(startTime).Nanoseconds())
1✔
685
        }()
1✔
686

687
        // DocID+RevID -> SeqNo
688
        expectedSeqs := make(map[IDAndRev]SequenceID, 0)
1✔
689
        alreadyKnownSeqs := make([]SequenceID, 0)
1✔
690

1✔
691
        for _, change := range changeList {
2✔
692
                docID := change[1].(string)
1✔
693
                revID := change[2].(string)
1✔
694
                missing, possible := bh.collection.RevDiff(bh.loggingCtx, docID, []string{revID})
1✔
695
                if nWritten > 0 {
2✔
696
                        output.Write([]byte(","))
1✔
697
                }
1✔
698

699
                deletedFlags := changesDeletedFlag(0)
1✔
700
                if len(change) > 3 {
2✔
701
                        switch v := change[3].(type) {
1✔
702
                        case json.Number:
1✔
703
                                deletedIntFlag, err := v.Int64()
1✔
704
                                if err != nil {
1✔
705
                                        base.ErrorfCtx(bh.loggingCtx, "Failed to parse deletedFlags: %v", err)
×
706
                                        continue
×
707
                                }
708
                                deletedFlags = changesDeletedFlag(deletedIntFlag)
1✔
709
                        case bool:
1✔
710
                                deletedFlags = changesDeletedFlagDeleted
1✔
711
                        default:
×
712
                                base.ErrorfCtx(bh.loggingCtx, "Unknown type for deleted field in changes message: %T", v)
×
713
                                continue
×
714
                        }
715

716
                }
717

718
                if bh.purgeOnRemoval && bh.activeCBMobileSubprotocol >= CBMobileReplicationV3 &&
1✔
719
                        (deletedFlags.HasFlag(changesDeletedFlagRevoked) || deletedFlags.HasFlag(changesDeletedFlagRemoved)) {
2✔
720
                        err := bh.collection.Purge(bh.loggingCtx, docID, true)
1✔
721
                        if err != nil {
2✔
722
                                base.WarnfCtx(bh.loggingCtx, "Failed to purge document: %v", err)
1✔
723
                        }
1✔
724
                        bh.replicationStats.HandleRevDocsPurgedCount.Add(1)
1✔
725

1✔
726
                        // Fall into skip sending case
1✔
727
                        missing = nil
1✔
728
                }
729

730
                if missing == nil {
2✔
731
                        // already have this rev, tell the peer to skip sending it
1✔
732
                        output.Write([]byte("0"))
1✔
733
                        if collectionCtx.sgr2PullAlreadyKnownSeqsCallback != nil {
2✔
734
                                seq, err := ParseJSONSequenceID(seqStr(bh.loggingCtx, change[0]))
1✔
735
                                if err != nil {
1✔
736
                                        base.WarnfCtx(bh.loggingCtx, "Unable to parse known sequence %q for %q / %q: %v", change[0], base.UD(docID), revID, err)
×
737
                                } else {
1✔
738
                                        // we're not able to checkpoint a sequence we can't parse and aren't expecting so just skip the callback if we errored
1✔
739
                                        alreadyKnownSeqs = append(alreadyKnownSeqs, seq)
1✔
740
                                }
1✔
741
                        }
742
                } else {
1✔
743
                        // we want this rev, send possible ancestors to the peer
1✔
744
                        nRequested++
1✔
745
                        if len(possible) == 0 {
2✔
746
                                output.Write([]byte("[]"))
1✔
747
                        } else {
2✔
748
                                err := jsonOutput.Encode(possible)
1✔
749
                                if err != nil {
1✔
750
                                        base.InfofCtx(bh.loggingCtx, base.KeyAll, "Error encoding json: %v", err)
×
751
                                }
×
752
                        }
753

754
                        // skip parsing seqno if we're not going to use it (no callback defined)
755
                        if collectionCtx.sgr2PullAddExpectedSeqsCallback != nil {
2✔
756
                                seq, err := ParseJSONSequenceID(seqStr(bh.loggingCtx, change[0]))
1✔
757
                                if err != nil {
1✔
758
                                        // We've already asked for the doc/rev for the sequence so assume we're going to receive it... Just log this and carry on
×
759
                                        base.WarnfCtx(bh.loggingCtx, "Unable to parse expected sequence %q for %q / %q: %v", change[0], base.UD(docID), revID, err)
×
760
                                } else {
1✔
761
                                        expectedSeqs[IDAndRev{DocID: docID, RevID: revID}] = seq
1✔
762
                                }
1✔
763
                        }
764
                }
765
                nWritten++
1✔
766
        }
767
        output.Write([]byte("]"))
1✔
768
        response := rq.Response()
1✔
769
        if bh.sgCanUseDeltas {
1✔
770
                base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on handleChanges response")
×
771
                response.Properties[ChangesResponseDeltas] = trueProperty
×
772
                bh.replicationStats.HandleChangesDeltaRequestedCount.Add(int64(nRequested))
×
773
        }
×
774
        response.SetCompressed(true)
1✔
775
        response.SetBody(output.Bytes())
1✔
776

1✔
777
        if collectionCtx.sgr2PullAddExpectedSeqsCallback != nil {
2✔
778
                collectionCtx.sgr2PullAddExpectedSeqsCallback(expectedSeqs)
1✔
779
        }
1✔
780
        if collectionCtx.sgr2PullAlreadyKnownSeqsCallback != nil {
2✔
781
                collectionCtx.sgr2PullAlreadyKnownSeqsCallback(alreadyKnownSeqs...)
1✔
782
        }
1✔
783

784
        return nil
1✔
785
}
786

787
// Handles a "proposeChanges" request, similar to "changes" but in no-conflicts mode
788
func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
1✔
789

1✔
790
        // we don't know whether this batch of changes has completed because they look like unsolicited revs to us,
1✔
791
        // but we can stop clients swarming us with these causing CheckProposedRev work
1✔
792
        bh.inFlightChangesThrottle <- struct{}{}
1✔
793
        defer func() { <-bh.inFlightChangesThrottle }()
2✔
794

795
        includeConflictRev := false
1✔
796
        if val := rq.Properties[ProposeChangesConflictsIncludeRev]; val != "" {
2✔
797
                includeConflictRev = val == trueProperty
1✔
798
        }
1✔
799

800
        var changeList [][]interface{}
1✔
801
        if err := rq.ReadJSONBody(&changeList); err != nil {
1✔
802
                return err
×
803
        }
×
804
        bh.logEndpointEntry(rq.Profile(), fmt.Sprintf("#Changes: %d", len(changeList)))
1✔
805
        if len(changeList) == 0 {
1✔
806
                return nil
×
807
        }
×
808
        output := bytes.NewBuffer(make([]byte, 0, 5*len(changeList)))
1✔
809
        output.Write([]byte("["))
1✔
810
        nWritten := 0
1✔
811

1✔
812
        // proposeChanges stats
1✔
813
        startTime := time.Now()
1✔
814
        bh.replicationStats.HandleChangesCount.Add(int64(len(changeList)))
1✔
815
        defer func() {
2✔
816
                bh.replicationStats.HandleChangesTime.Add(time.Since(startTime).Nanoseconds())
1✔
817
        }()
1✔
818

819
        for i, change := range changeList {
2✔
820
                docID := change[0].(string)
1✔
821
                revID := change[1].(string)
1✔
822
                parentRevID := ""
1✔
823
                if len(change) > 2 {
2✔
824
                        parentRevID = change[2].(string)
1✔
825
                }
1✔
826
                status, currentRev := bh.collection.CheckProposedRev(bh.loggingCtx, docID, revID, parentRevID)
1✔
827
                if status == ProposedRev_OK_IsNew {
2✔
828
                        // Remember that the doc doesn't exist locally, in order to optimize the upcoming Put:
1✔
829
                        bh.collectionCtx.notePendingInsertion(docID)
1✔
830
                } else if status != ProposedRev_OK {
3✔
831
                        // Reject the proposed change.
1✔
832
                        // Skip writing trailing zeroes; but if we write a number afterwards we have to catch up
1✔
833
                        if nWritten > 0 {
2✔
834
                                output.Write([]byte(","))
1✔
835
                        }
1✔
836
                        for ; nWritten < i; nWritten++ {
2✔
837
                                output.Write([]byte("0,"))
1✔
838
                        }
1✔
839
                        if includeConflictRev && status == ProposedRev_Conflict {
2✔
840
                                revEntry := IncludeConflictRevEntry{Status: status, Rev: currentRev}
1✔
841
                                entryBytes, marshalErr := base.JSONMarshal(revEntry)
1✔
842
                                if marshalErr != nil {
1✔
843
                                        base.WarnfCtx(bh.loggingCtx, "Unable to marshal proposeChangesEntry as includeConflictRev - falling back to status-only entry.  Error: %v", marshalErr)
×
844
                                        output.Write([]byte(strconv.FormatInt(int64(status), 10)))
×
845
                                }
×
846
                                output.Write(entryBytes)
1✔
847

848
                        } else {
1✔
849
                                output.Write([]byte(strconv.FormatInt(int64(status), 10)))
1✔
850
                        }
1✔
851
                        nWritten++
1✔
852
                }
853
        }
854
        output.Write([]byte("]"))
1✔
855
        response := rq.Response()
1✔
856
        if bh.sgCanUseDeltas {
2✔
857
                base.DebugfCtx(bh.loggingCtx, base.KeyAll, "Setting deltas=true property on proposeChanges response")
1✔
858
                response.Properties[ChangesResponseDeltas] = trueProperty
1✔
859
        }
1✔
860
        response.SetCompressed(true)
1✔
861
        response.SetBody(output.Bytes())
1✔
862
        return nil
1✔
863
}
864

865
// ////// DOCUMENTS:
866

867
func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sender, docID, revID string, deltaSrcRevID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int) error {
×
868
        bsc.replicationStats.SendRevDeltaRequestedCount.Add(1)
×
869

×
870
        revDelta, redactedRev, err := handleChangesResponseCollection.GetDelta(ctx, docID, deltaSrcRevID, revID)
×
871
        if err == ErrForbidden { // nolint: gocritic // can't convert if/else if to switch since base.IsFleeceDeltaError is not switchable
×
872
                return err
×
873
        } else if base.IsFleeceDeltaError(err) {
×
874
                // Something went wrong in the diffing library. We want to know about this!
×
875
                base.WarnfCtx(ctx, "Falling back to full body replication. Error generating delta from %s to %s for key %s - err: %v", deltaSrcRevID, revID, base.UD(docID), err)
×
876
                return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
×
877
        } else if err == base.ErrDeltaSourceIsTombstone {
×
878
                base.TracefCtx(ctx, base.KeySync, "Falling back to full body replication. Delta source %s is tombstone. Unable to generate delta to %s for key %s", deltaSrcRevID, revID, base.UD(docID))
×
879
                return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
×
880
        } else if err != nil {
×
881
                base.DebugfCtx(ctx, base.KeySync, "Falling back to full body replication. Couldn't get delta from %s to %s for key %s - err: %v", deltaSrcRevID, revID, base.UD(docID), err)
×
882
                return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
×
883
        }
×
884

885
        if redactedRev != nil {
×
886
                history := toHistory(redactedRev.History, knownRevs, maxHistory)
×
887
                properties := blipRevMessageProperties(history, redactedRev.Deleted, seq, "")
×
888
                return bsc.sendRevisionWithProperties(ctx, sender, docID, revID, collectionIdx, redactedRev.BodyBytes, nil, properties, seq, nil)
×
889
        }
×
890

891
        if revDelta == nil {
×
892
                base.DebugfCtx(ctx, base.KeySync, "Falling back to full body replication. Couldn't get delta from %s to %s for key %s", deltaSrcRevID, revID, base.UD(docID))
×
893
                return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
×
894
        }
×
895

896
        resendFullRevisionFunc := func() error {
×
897
                base.InfofCtx(ctx, base.KeySync, "Resending revision as full body. Peer couldn't process delta %s from %s to %s for key %s", base.UD(revDelta.DeltaBytes), deltaSrcRevID, revID, base.UD(docID))
×
898
                return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx)
×
899
        }
×
900

901
        base.TracefCtx(ctx, base.KeySync, "docID: %s - delta: %v", base.UD(docID), base.UD(string(revDelta.DeltaBytes)))
×
902
        if err := bsc.sendDelta(ctx, sender, docID, collectionIdx, deltaSrcRevID, revDelta, seq, resendFullRevisionFunc); err != nil {
×
903
                return err
×
904
        }
×
905

906
        // We'll consider this one doc read for collection stats purposes, since GetDelta doesn't go through the normal getRev codepath.
907
        handleChangesResponseCollection.collectionStats.NumDocReads.Add(1)
×
908
        handleChangesResponseCollection.collectionStats.DocReadsBytes.Add(int64(len(revDelta.DeltaBytes)))
×
909

×
910
        bsc.replicationStats.SendRevDeltaSentCount.Add(1)
×
911
        return nil
×
912
}
913

914
func (bh *blipHandler) handleNoRev(rq *blip.Message) error {
1✔
915
        docID, revID := rq.Properties[NorevMessageId], rq.Properties[NorevMessageRev]
1✔
916
        var seqStr string
1✔
917
        if bh.activeCBMobileSubprotocol <= CBMobileReplicationV2 && bh.clientType == BLIPClientTypeSGR2 {
1✔
918
                seqStr = rq.Properties[NorevMessageSeq]
×
919
        } else {
1✔
920
                seqStr = rq.Properties[NorevMessageSequence]
1✔
921
        }
1✔
922
        base.InfofCtx(bh.loggingCtx, base.KeySyncMsg, "%s: norev for doc %q / %q seq:%q - error: %q - reason: %q",
1✔
923
                rq.String(), base.UD(docID), revID, seqStr, rq.Properties[NorevMessageError], rq.Properties[NorevMessageReason])
1✔
924

1✔
925
        if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
1✔
926
                seq, err := ParseJSONSequenceID(seqStr)
×
927
                if err != nil {
×
928
                        base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from norev message: %v - not tracking for checkpointing", seqStr, err)
×
929
                } else {
×
930
                        bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
×
931
                }
×
932
        }
933

934
        // Couchbase Lite always sends noreply=true for norev profiles
935
        // but for testing purposes, it's useful to know which handler processed the message
936
        if !rq.NoReply() && rq.Properties[SGShowHandler] == trueProperty {
2✔
937
                response := rq.Response()
1✔
938
                response.Properties[SGHandler] = "handleNoRev"
1✔
939
        }
1✔
940

941
        return nil
1✔
942
}
943

944
type processRevStats struct {
945
        count            *base.SgwIntStat // Increments when rev processed successfully
946
        errorCount       *base.SgwIntStat
947
        deltaRecvCount   *base.SgwIntStat
948
        bytes            *base.SgwIntStat
949
        processingTime   *base.SgwIntStat
950
        docsPurgedCount  *base.SgwIntStat
951
        throttledRevs    *base.SgwIntStat
952
        throttledRevTime *base.SgwIntStat
953
}
954

955
// Processes a "rev" request, i.e. client is pushing a revision body
956
// stats must always be provided, along with all the fields filled with valid pointers
957
func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err error) {
1✔
958
        startTime := time.Now()
1✔
959
        defer func() {
2✔
960
                stats.processingTime.Add(time.Since(startTime).Nanoseconds())
1✔
961
                if err == nil {
2✔
962
                        stats.count.Add(1)
1✔
963
                } else {
2✔
964
                        stats.errorCount.Add(1)
1✔
965
                }
1✔
966
        }()
967

968
        // throttle concurrent revs
969
        if cap(bh.inFlightRevsThrottle) > 0 {
2✔
970
                select {
1✔
971
                case bh.inFlightRevsThrottle <- struct{}{}:
1✔
972
                default:
1✔
973
                        stats.throttledRevs.Add(1)
1✔
974
                        throttleStart := time.Now()
1✔
975
                        bh.inFlightRevsThrottle <- struct{}{}
1✔
976
                        stats.throttledRevTime.Add(time.Since(throttleStart).Nanoseconds())
1✔
977
                }
978
                defer func() { <-bh.inFlightRevsThrottle }()
2✔
979
        }
980

981
        // addRevisionParams := newAddRevisionParams(rq)
982
        revMessage := RevMessage{Message: rq}
1✔
983

1✔
984
        // Doc metadata comes from the BLIP message metadata, not magic document properties:
1✔
985
        docID, found := revMessage.ID()
1✔
986
        revID, rfound := revMessage.Rev()
1✔
987
        if !found || !rfound {
1✔
988
                return base.HTTPErrorf(http.StatusBadRequest, "Missing docID or revID")
×
989
        }
×
990

991
        if bh.readOnly {
1✔
992
                return base.HTTPErrorf(http.StatusForbidden, "Replication context is read-only, docID: %s, revID:%s", docID, revID)
×
993
        }
×
994

995
        base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s %s", bh.serialNumber, rq.Profile(), revMessage.String())
1✔
996

1✔
997
        bodyBytes, err := rq.Body()
1✔
998
        if err != nil {
1✔
999
                return err
×
1000
        }
×
1001

1002
        base.TracefCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Properties:%v  Body:%s", bh.serialNumber, base.UD(revMessage.Properties), base.UD(string(bodyBytes)))
1✔
1003

1✔
1004
        stats.bytes.Add(int64(len(bodyBytes)))
1✔
1005

1✔
1006
        if bh.BlipSyncContext.purgeOnRemoval && bytes.Contains(bodyBytes, []byte(`"`+BodyRemoved+`":`)) {
1✔
1007
                var body Body
×
1008
                if err := body.Unmarshal(bodyBytes); err != nil {
×
1009
                        return err
×
1010
                }
×
1011
                if removed, ok := body[BodyRemoved].(bool); ok && removed {
×
1012
                        base.InfofCtx(bh.loggingCtx, base.KeySync, "Purging doc %v - removed at rev %v", base.UD(docID), revID)
×
1013
                        if err := bh.collection.Purge(bh.loggingCtx, docID, true); err != nil {
×
1014
                                return err
×
1015
                        }
×
1016

1017
                        stats.docsPurgedCount.Add(1)
×
1018
                        if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
×
1019
                                seqStr := rq.Properties[RevMessageSequence]
×
1020
                                seq, err := ParseJSONSequenceID(seqStr)
×
1021
                                if err != nil {
×
1022
                                        base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqStr, err)
×
1023
                                } else {
×
1024
                                        bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
×
1025
                                }
×
1026
                        }
1027
                        return nil
×
1028
                }
1029
        }
1030

1031
        newDoc := &Document{
1✔
1032
                ID:    docID,
1✔
1033
                RevID: revID,
1✔
1034
        }
1✔
1035
        newDoc.UpdateBodyBytes(bodyBytes)
1✔
1036

1✔
1037
        injectedAttachmentsForDelta := false
1✔
1038
        if deltaSrcRevID, isDelta := revMessage.DeltaSrc(); isDelta && !revMessage.Deleted() {
1✔
1039
                if !bh.sgCanUseDeltas {
×
1040
                        return base.HTTPErrorf(http.StatusBadRequest, "Deltas are disabled for this peer")
×
1041
                }
×
1042

1043
                //  TODO: Doing a GetRevCopy here duplicates some rev cache retrieval effort, since deltaRevSrc is always
1044
                //        going to be the current rev (no conflicts), and PutExistingRev will need to retrieve the
1045
                //        current rev over again.  Should push this handling down PutExistingRev and use the version
1046
                //        returned via callback in WriteUpdate, but blocked by moving attachment metadata to a rev property first
1047
                //        (otherwise we don't have information needed to do downloadOrVerifyAttachments below prior to PutExistingRev)
1048

1049
                // Note: Using GetRevCopy here, and not direct rev cache retrieval, because it's still necessary to apply access check
1050
                //       while retrieving deltaSrcRevID.  Couchbase Lite replication guarantees client has access to deltaSrcRevID,
1051
                //       due to no-conflict write restriction, but we still need to enforce security here to prevent leaking data about previous
1052
                //       revisions to malicious actors (in the scenario where that user has write but not read access).
1053
                deltaSrcRev, err := bh.collection.GetRev(bh.loggingCtx, docID, deltaSrcRevID, false, nil)
×
1054
                if err != nil {
×
1055
                        return base.HTTPErrorf(http.StatusUnprocessableEntity, "Can't fetch doc %s for deltaSrc=%s %v", base.UD(docID), deltaSrcRevID, err)
×
1056
                }
×
1057

1058
                // Receiving a delta to be applied on top of a tombstone is not valid.
1059
                if deltaSrcRev.Deleted {
×
1060
                        return base.HTTPErrorf(http.StatusUnprocessableEntity, "Can't use delta. Found tombstone for doc %s deltaSrc=%s", base.UD(docID), deltaSrcRevID)
×
1061
                }
×
1062

1063
                deltaSrcBody, err := deltaSrcRev.MutableBody()
×
1064
                if err != nil {
×
1065
                        return base.HTTPErrorf(http.StatusUnprocessableEntity, "Unable to unmarshal mutable body for doc %s deltaSrc=%s %v", base.UD(docID), deltaSrcRevID, err)
×
1066
                }
×
1067

1068
                if deltaSrcBody[BodyRemoved] != nil {
×
1069
                        return base.HTTPErrorf(http.StatusUnprocessableEntity, "Can't use delta. Found _removed property for doc %s deltaSrc=%s", base.UD(docID), deltaSrcRevID)
×
1070
                }
×
1071
                // Stamp attachments so we can patch them
1072
                if len(deltaSrcRev.Attachments) > 0 {
×
1073
                        deltaSrcBody[BodyAttachments] = map[string]interface{}(deltaSrcRev.Attachments)
×
1074
                        injectedAttachmentsForDelta = true
×
1075
                }
×
1076

1077
                deltaSrcMap := map[string]interface{}(deltaSrcBody)
×
1078
                err = base.Patch(&deltaSrcMap, newDoc.Body(bh.loggingCtx))
×
1079
                // err should only ever be a FleeceDeltaError here - but to be defensive, handle other errors too (e.g. somehow reaching this code in a CE build)
×
1080
                if err != nil {
×
1081
                        // Something went wrong in the diffing library. We want to know about this!
×
1082
                        base.WarnfCtx(bh.loggingCtx, "Error patching deltaSrc %s with %s for doc %s with delta - err: %v", deltaSrcRevID, revID, base.UD(docID), err)
×
1083
                        return base.HTTPErrorf(http.StatusUnprocessableEntity, "Error patching deltaSrc with delta: %s", err)
×
1084
                }
×
1085

1086
                newDoc.UpdateBody(deltaSrcMap)
×
1087
                base.TracefCtx(bh.loggingCtx, base.KeySync, "docID: %s - body after patching: %v", base.UD(docID), base.UD(deltaSrcMap))
×
1088
                stats.deltaRecvCount.Add(1)
×
1089
        }
1090

1091
        err = validateBlipBody(bh.loggingCtx, bodyBytes, newDoc)
1✔
1092
        if err != nil {
2✔
1093
                return err
1✔
1094
        }
1✔
1095

1096
        // Handle and pull out expiry
1097
        if bytes.Contains(bodyBytes, []byte(BodyExpiry)) {
2✔
1098
                body := newDoc.Body(bh.loggingCtx)
1✔
1099
                expiry, err := body.ExtractExpiry()
1✔
1100
                if err != nil {
2✔
1101
                        return base.HTTPErrorf(http.StatusBadRequest, "Invalid expiry: %v", err)
1✔
1102
                }
1✔
1103
                newDoc.DocExpiry = expiry
1✔
1104
                newDoc.UpdateBody(body)
1✔
1105
        }
1106

1107
        newDoc.Deleted = revMessage.Deleted()
1✔
1108

1✔
1109
        // noconflicts flag from LiteCore
1✔
1110
        // https://github.com/couchbase/couchbase-lite-core/wiki/Replication-Protocol#rev
1✔
1111
        revNoConflicts := false
1✔
1112
        if val, ok := rq.Properties[RevMessageNoConflicts]; ok {
2✔
1113
                var err error
1✔
1114
                revNoConflicts, err = strconv.ParseBool(val)
1✔
1115
                if err != nil {
1✔
1116
                        return base.HTTPErrorf(http.StatusBadRequest, "Invalid value for noconflicts: %s", err)
×
1117
                }
×
1118
        }
1119

1120
        history := []string{revID}
1✔
1121
        if historyStr := rq.Properties[RevMessageHistory]; historyStr != "" {
2✔
1122
                history = append(history, strings.Split(historyStr, ",")...)
1✔
1123
        }
1✔
1124

1125
        var rawBucketDoc *sgbucket.BucketDocument
1✔
1126

1✔
1127
        // Pull out attachments
1✔
1128
        if injectedAttachmentsForDelta || bytes.Contains(bodyBytes, []byte(BodyAttachments)) {
2✔
1129
                body := newDoc.Body(bh.loggingCtx)
1✔
1130
                // The bytes.Contains([]byte(BodyAttachments)) check will pass even if _attachments is not a toplevel key but rather a nested key or subkey. That check is an optimization to avoid having to unmarshal the document if there are no attachments. Therefore, check again that the unmarshalled body contains BodyAttachments.
1✔
1131
                if body[BodyAttachments] != nil {
2✔
1132

1✔
1133
                        var currentBucketDoc *Document
1✔
1134

1✔
1135
                        // Look at attachments with revpos > the last common ancestor's
1✔
1136
                        if len(history) > 0 {
2✔
1137
                                currentDoc, rawDoc, err := bh.collection.GetDocumentWithRaw(bh.loggingCtx, docID, DocUnmarshalSync)
1✔
1138
                                // If we're able to obtain current doc data then we should use the common ancestor generation++ for min revpos
1✔
1139
                                // as we will already have any attachments on the common ancestor so don't need to ask for them.
1✔
1140
                                // Otherwise we'll have to go as far back as we can in the doc history and choose the last entry in there.
1✔
1141
                                if err == nil {
2✔
1142
                                        rawBucketDoc = rawDoc
1✔
1143
                                        currentBucketDoc = currentDoc
1✔
1144
                                }
1✔
1145
                        }
1146
                        // updatedRevPos is the revpos of the new revision, to be added to attachment metadata if needed for CBL<4.0 compatibility. revpos is no longer used by Sync Gateway.
1147
                        updatedRevPos, _ := ParseRevID(bh.loggingCtx, revID)
1✔
1148

1✔
1149
                        // currentDigests is a map from attachment name to the current bucket doc digest,
1✔
1150
                        // for any attachments on the incoming document that are also on the current bucket doc
1✔
1151
                        var currentDigests map[string]string
1✔
1152

1✔
1153
                        // Do we have a previous doc? If not don't need to do this check
1✔
1154
                        if currentBucketDoc != nil {
2✔
1155
                                bodyAtts := GetBodyAttachments(body)
1✔
1156
                                currentDigests = make(map[string]string, len(bodyAtts))
1✔
1157
                                for name, value := range bodyAtts {
2✔
1158
                                        // Check if we have this attachment name already, if we do, continue check
1✔
1159
                                        currentAttachment, ok := currentBucketDoc.Attachments[name]
1✔
1160
                                        if !ok {
2✔
1161
                                                // If we don't have this attachment already, ensure incoming revpos is greater than minRevPos, otherwise
1✔
1162
                                                // update to ensure it's fetched and uploaded
1✔
1163
                                                bodyAtts[name].(map[string]interface{})["revpos"] = updatedRevPos
1✔
1164
                                                continue
1✔
1165
                                        }
1166

1167
                                        currentAttachmentMeta, ok := currentAttachment.(map[string]interface{})
1✔
1168
                                        if !ok {
1✔
1169
                                                return base.HTTPErrorf(http.StatusInternalServerError, "Current attachment data is invalid")
×
1170
                                        }
×
1171

1172
                                        currentAttachmentDigest, ok := currentAttachmentMeta["digest"].(string)
1✔
1173
                                        if !ok {
1✔
1174
                                                return base.HTTPErrorf(http.StatusInternalServerError, "Current attachment data is invalid")
×
1175
                                        }
×
1176
                                        currentDigests[name] = currentAttachmentDigest
1✔
1177

1✔
1178
                                        incomingAttachmentMeta, ok := value.(map[string]interface{})
1✔
1179
                                        if !ok {
1✔
1180
                                                return base.HTTPErrorf(http.StatusBadRequest, "Invalid attachment")
×
1181
                                        }
×
1182

1183
                                        // If this attachment has data then we're fine, this isn't a stub attachment and therefore doesn't
1184
                                        // need the check.
1185
                                        if incomingAttachmentMeta["data"] != nil {
1✔
1186
                                                continue
×
1187
                                        }
1188

1189
                                        incomingAttachmentDigest, ok := incomingAttachmentMeta["digest"].(string)
1✔
1190
                                        if !ok {
1✔
1191
                                                return base.HTTPErrorf(http.StatusBadRequest, "Invalid attachment")
×
1192
                                        }
×
1193

1194
                                        // Compare the revpos and attachment digest. If incoming revpos is less than or equal to minRevPos and
1195
                                        // digest is different we need to override the revpos and set it to the current revision to ensure
1196
                                        // the attachment is requested and stored. revpos provided for SG/CBL<4.0 compatibility but is no longer used by Sync Gateway.
1197
                                        if currentAttachmentDigest != incomingAttachmentDigest {
2✔
1198
                                                bodyAtts[name].(map[string]interface{})["revpos"] = updatedRevPos
1✔
1199
                                        }
1✔
1200
                                }
1201

1202
                                body[BodyAttachments] = bodyAtts
1✔
1203
                        }
1204

1205
                        if err := bh.downloadOrVerifyAttachments(rq.Sender, body, docID, currentDigests); err != nil {
2✔
1206
                                base.ErrorfCtx(bh.loggingCtx, "Error during downloadOrVerifyAttachments for doc %s/%s: %v", base.UD(docID), revID, err)
1✔
1207
                                return err
1✔
1208
                        }
1✔
1209

1210
                        newDoc.DocAttachments = GetBodyAttachments(body)
1✔
1211
                        delete(body, BodyAttachments)
1✔
1212
                        newDoc.UpdateBody(body)
1✔
1213
                }
1214

1215
        }
1216

1217
        if rawBucketDoc == nil && bh.collectionCtx.checkPendingInsertion(docID) {
2✔
1218
                // At the time we handled the `propseChanges` request, there was no doc with this docID
1✔
1219
                // in the bucket. As an optimization, tell PutExistingRev to assume the doc still doesn't
1✔
1220
                // exist and bypass getting it from the bucket during the save. If we're wrong, the save
1✔
1221
                // will fail with a CAS mismatch and the retry will fetch the existing doc.
1✔
1222
                rawBucketDoc = &sgbucket.BucketDocument{} // empty struct with zero CAS
1✔
1223
        }
1✔
1224

1225
        // Finally, save the revision (with the new attachments inline)
1226
        // If a conflict resolver is defined for the handler, write with conflict resolution.
1227

1228
        // If the doc is a tombstone we want to allow conflicts when running SGR2
1229
        // bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2
1230
        forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2)
1✔
1231
        if bh.conflictResolver != nil {
2✔
1232
                _, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc)
1✔
1233
        } else {
2✔
1234
                _, _, err = bh.collection.PutExistingRev(bh.loggingCtx, newDoc, history, revNoConflicts, forceAllowConflictingTombstone, rawBucketDoc)
1✔
1235
        }
1✔
1236
        if err != nil {
2✔
1237
                return err
1✔
1238
        }
1✔
1239

1240
        if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
2✔
1241
                seqProperty := rq.Properties[RevMessageSequence]
1✔
1242
                seq, err := ParseJSONSequenceID(seqProperty)
1✔
1243
                if err != nil {
1✔
1244
                        base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqProperty, err)
×
1245
                } else {
1✔
1246
                        bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
1✔
1247
                }
1✔
1248
        }
1249

1250
        return nil
1✔
1251
}
1252

1253
// Handler for when a rev is received from the client
1254
func (bh *blipHandler) handleRev(rq *blip.Message) (err error) {
1✔
1255
        stats := processRevStats{
1✔
1256
                count:            bh.replicationStats.HandleRevCount,
1✔
1257
                errorCount:       bh.replicationStats.HandleRevErrorCount,
1✔
1258
                deltaRecvCount:   bh.replicationStats.HandleRevDeltaRecvCount,
1✔
1259
                bytes:            bh.replicationStats.HandleRevBytes,
1✔
1260
                processingTime:   bh.replicationStats.HandleRevProcessingTime,
1✔
1261
                docsPurgedCount:  bh.replicationStats.HandleRevDocsPurgedCount,
1✔
1262
                throttledRevs:    bh.replicationStats.HandleRevThrottledCount,
1✔
1263
                throttledRevTime: bh.replicationStats.HandleRevThrottledTime,
1✔
1264
        }
1✔
1265
        return bh.processRev(rq, &stats)
1✔
1266
}
1✔
1267

1268
// ////// ATTACHMENTS:
1269

1270
func (bh *blipHandler) handleProveAttachment(rq *blip.Message) error {
1✔
1271
        nonce, err := rq.Body()
1✔
1272
        if err != nil {
1✔
1273
                return err
×
1274
        }
×
1275

1276
        if len(nonce) == 0 {
1✔
1277
                return base.HTTPErrorf(http.StatusBadRequest, "no nonce sent with proveAttachment")
×
1278
        }
×
1279

1280
        digest, ok := rq.Properties[ProveAttachmentDigest]
1✔
1281
        if !ok {
1✔
1282
                return base.HTTPErrorf(http.StatusBadRequest, "no digest sent with proveAttachment")
×
1283
        }
×
1284

1285
        allowedAttachment := bh.allowedAttachment(digest)
1✔
1286
        attachmentKey := MakeAttachmentKey(allowedAttachment.version, allowedAttachment.docID, digest)
1✔
1287
        attData, err := bh.collection.GetAttachment(attachmentKey)
1✔
1288
        if err != nil {
1✔
1289
                if bh.clientType == BLIPClientTypeSGR2 {
×
1290
                        return ErrAttachmentNotFound
×
1291
                }
×
1292
                if base.IsDocNotFoundError(err) {
×
1293
                        return ErrAttachmentNotFound
×
1294
                }
×
1295
                return base.HTTPErrorf(http.StatusInternalServerError, "Error getting client attachment: %v", err)
×
1296
        }
1297

1298
        proof := ProveAttachment(bh.loggingCtx, attData, nonce)
1✔
1299

1✔
1300
        resp := rq.Response()
1✔
1301
        resp.SetBody([]byte(proof))
1✔
1302

1✔
1303
        bh.replicationStats.HandleProveAttachment.Add(1)
1✔
1304

1✔
1305
        return nil
1✔
1306
}
1307

1308
// Received a "getAttachment" request
1309
func (bh *blipHandler) handleGetAttachment(rq *blip.Message) error {
1✔
1310

1✔
1311
        getAttachmentParams := newGetAttachmentParams(rq)
1✔
1312
        bh.logEndpointEntry(rq.Profile(), getAttachmentParams.String())
1✔
1313

1✔
1314
        digest := getAttachmentParams.digest()
1✔
1315
        if digest == "" {
1✔
1316
                return base.HTTPErrorf(http.StatusBadRequest, "Missing 'digest'")
×
1317
        }
×
1318

1319
        docID := ""
1✔
1320
        attachmentAllowedKey := digest
1✔
1321
        if bh.activeCBMobileSubprotocol >= CBMobileReplicationV3 {
2✔
1322
                docID = getAttachmentParams.docID()
1✔
1323
                if docID == "" {
1✔
1324
                        return base.HTTPErrorf(http.StatusBadRequest, "Missing 'docID'")
×
1325
                }
×
1326
                attachmentAllowedKey = docID + digest
1✔
1327
        }
1328

1329
        // attachmentName should only be used for logging, it is one possible name of an attachment for a given document if multiple attachments share the same digest.
1330
        allowedAttachment := bh.allowedAttachment(attachmentAllowedKey)
1✔
1331
        if allowedAttachment.counter <= 0 {
2✔
1332
                return base.HTTPErrorf(http.StatusForbidden, "Attachment's doc not being synced")
1✔
1333
        }
1✔
1334

1335
        if bh.activeCBMobileSubprotocol <= CBMobileReplicationV2 {
2✔
1336
                docID = allowedAttachment.docID
1✔
1337
        }
1✔
1338

1339
        attachmentKey := MakeAttachmentKey(allowedAttachment.version, docID, digest)
1✔
1340
        attachment, err := bh.collection.GetAttachment(attachmentKey)
1✔
1341
        if err != nil {
1✔
1342
                return err
×
1343

×
1344
        }
×
1345
        base.DebugfCtx(bh.loggingCtx, base.KeySync, "Sending attachment with digest=%q (%.2f KB)", digest, float64(len(attachment))/float64(1024))
1✔
1346
        response := rq.Response()
1✔
1347
        response.SetBody(attachment)
1✔
1348
        response.SetCompressed(rq.Properties[BlipCompress] == trueProperty)
1✔
1349
        bh.replicationStats.HandleGetAttachment.Add(1)
1✔
1350
        bh.replicationStats.HandleGetAttachmentBytes.Add(int64(len(attachment)))
1✔
1351
        base.Audit(bh.loggingCtx, base.AuditIDAttachmentRead, base.AuditFields{
1✔
1352
                base.AuditFieldDocID:        docID,
1✔
1353
                base.AuditFieldDocVersion:   allowedAttachment.docVersion,
1✔
1354
                base.AuditFieldAttachmentID: allowedAttachment.name,
1✔
1355
        })
1✔
1356

1✔
1357
        return nil
1✔
1358
}
1359

1360
var errNoBlipHandler = fmt.Errorf("404 - No handler for BLIP request")
1361

1362
// sendGetAttachment requests the full attachment from the peer.
1363
func (bh *blipHandler) sendGetAttachment(sender *blip.Sender, docID string, name string, digest string, meta map[string]interface{}) ([]byte, error) {
1✔
1364
        base.DebugfCtx(bh.loggingCtx, base.KeySync, "    Asking for attachment %q for doc %s (digest %s)", base.UD(name), base.UD(docID), digest)
1✔
1365
        outrq := blip.NewRequest()
1✔
1366
        outrq.SetProfile(MessageGetAttachment)
1✔
1367
        outrq.Properties[GetAttachmentDigest] = digest
1✔
1368
        if bh.collectionIdx != nil {
2✔
1369
                outrq.Properties[BlipCollection] = strconv.Itoa(*bh.collectionIdx)
1✔
1370
        }
1✔
1371
        if isCompressible(name, meta) {
2✔
1372
                outrq.Properties[BlipCompress] = trueProperty
1✔
1373
        }
1✔
1374

1375
        if bh.activeCBMobileSubprotocol >= CBMobileReplicationV3 {
2✔
1376
                outrq.Properties[GetAttachmentID] = docID
1✔
1377
        }
1✔
1378

1379
        if !bh.sendBLIPMessage(sender, outrq) {
1✔
1380
                return nil, ErrClosedBLIPSender
×
1381
        }
×
1382

1383
        resp := outrq.Response()
1✔
1384

1✔
1385
        respBody, err := resp.Body()
1✔
1386
        if err != nil {
1✔
1387
                return nil, err
×
1388
        }
×
1389

1390
        if resp.Properties[BlipErrorCode] != "" {
1✔
1391
                return nil, fmt.Errorf("error %s from getAttachment: %s", resp.Properties[BlipErrorCode], respBody)
×
1392
        }
×
1393
        lNum, metaLengthOK := meta["length"]
1✔
1394
        if !metaLengthOK {
1✔
1395
                return nil, fmt.Errorf("no attachment length provided in meta")
×
1396
        }
×
1397

1398
        metaLength, ok := base.ToInt64(lNum)
1✔
1399
        if !ok {
1✔
1400
                return nil, fmt.Errorf("invalid attachment length %q found in meta", lNum)
×
1401
        }
×
1402

1403
        // Verify that the attachment we received matches the metadata stored in the document
1404
        expectedLength := int(metaLength)
1✔
1405
        actualLength := len(respBody)
1✔
1406
        if actualLength != expectedLength {
2✔
1407
                return nil, base.HTTPErrorf(http.StatusBadRequest, "Incorrect data sent for attachment with digest: %s (length mismatch - expected %d got %d)", digest, expectedLength, actualLength)
1✔
1408
        }
1✔
1409

1410
        actualDigest := Sha1DigestKey(respBody)
1✔
1411
        if actualDigest != digest {
2✔
1412
                return nil, base.HTTPErrorf(http.StatusBadRequest, "Incorrect data sent for attachment with digest: %s (digest mismatch - got %s)", digest, actualDigest)
1✔
1413
        }
1✔
1414

1415
        bh.replicationStats.GetAttachment.Add(1)
1✔
1416
        bh.replicationStats.GetAttachmentBytes.Add(metaLength)
1✔
1417

1✔
1418
        return respBody, nil
1✔
1419
}
1420

1421
// sendProveAttachment asks the peer to prove they have the attachment, without actually sending it.
1422
// This is to prevent clients from creating a doc with a digest for an attachment they otherwise can't access, in order to download it.
1423
func (bh *blipHandler) sendProveAttachment(sender *blip.Sender, docID, name, digest string, knownData []byte) error {
1✔
1424
        base.DebugfCtx(bh.loggingCtx, base.KeySync, "    Verifying attachment %q for doc %s (digest %s)", base.UD(name), base.UD(docID), digest)
1✔
1425
        nonce, proof, err := GenerateProofOfAttachment(bh.loggingCtx, knownData)
1✔
1426
        if err != nil {
1✔
1427
                return err
×
1428
        }
×
1429
        outrq := blip.NewRequest()
1✔
1430
        outrq.SetProfile(MessageProveAttachment)
1✔
1431
        outrq.Properties[ProveAttachmentDigest] = digest
1✔
1432
        if bh.collectionIdx != nil {
2✔
1433
                outrq.Properties[BlipCollection] = strconv.Itoa(*bh.collectionIdx)
1✔
1434
        }
1✔
1435
        outrq.SetBody(nonce)
1✔
1436
        if !bh.sendBLIPMessage(sender, outrq) {
1✔
1437
                return ErrClosedBLIPSender
×
1438
        }
×
1439

1440
        resp := outrq.Response()
1✔
1441

1✔
1442
        body, err := resp.Body()
1✔
1443
        if err != nil {
1✔
1444
                base.WarnfCtx(bh.loggingCtx, "Error returned for proveAttachment message for doc %s (digest %s).  Error: %v", base.UD(docID), digest, err)
×
1445
                return err
×
1446
        }
×
1447

1448
        if resp.Type() == blip.ErrorType &&
1✔
1449
                resp.Properties[BlipErrorDomain] == blip.BLIPErrorDomain &&
1✔
1450
                resp.Properties[BlipErrorCode] == "404" {
1✔
1451
                return errNoBlipHandler
×
1452
        }
×
1453

1454
        if resp.Type() == blip.ErrorType &&
1✔
1455
                errorDomainIsHTTP(resp) &&
1✔
1456
                resp.Properties[BlipErrorCode] == "404" {
2✔
1457
                return ErrAttachmentNotFound
1✔
1458
        }
1✔
1459

1460
        if string(body) != proof {
1✔
1461
                base.WarnfCtx(bh.loggingCtx, "Incorrect proof for attachment %s : I sent nonce %x, expected proof %q, got %q", digest, base.MD(nonce), base.MD(proof), base.MD(string(body)))
×
1462
                return base.HTTPErrorf(http.StatusForbidden, "Incorrect proof for attachment %s", digest)
×
1463
        }
×
1464

1465
        bh.replicationStats.ProveAttachment.Add(1)
1✔
1466

1✔
1467
        base.InfofCtx(bh.loggingCtx, base.KeySync, "proveAttachment successful for doc %s (digest %s)", base.UD(docID), digest)
1✔
1468
        return nil
1✔
1469
}
1470

1471
// For each attachment in the revision, makes sure it's in the database, asking the client to
1472
// upload it if necessary. This method blocks until all the attachments have been processed.
1473
func (bh *blipHandler) downloadOrVerifyAttachments(sender *blip.Sender, body Body, docID string, currentDigests map[string]string) error {
1✔
1474
        return bh.collection.ForEachStubAttachment(body, docID, currentDigests,
1✔
1475
                func(name string, digest string, knownData []byte, meta map[string]interface{}) ([]byte, error) {
2✔
1476
                        // Request attachment if we don't have it
1✔
1477
                        if knownData == nil {
2✔
1478
                                return bh.sendGetAttachment(sender, docID, name, digest, meta)
1✔
1479
                        }
1✔
1480

1481
                        // Ask client to prove they have the attachment without sending it
1482
                        proveAttErr := bh.sendProveAttachment(sender, docID, name, digest, knownData)
1✔
1483
                        if proveAttErr == nil {
2✔
1484
                                return nil, nil
1✔
1485
                        }
1✔
1486

1487
                        // Peer doesn't support proveAttachment or does not have attachment. Fall back to using getAttachment as proof.
1488
                        if proveAttErr == errNoBlipHandler || proveAttErr == ErrAttachmentNotFound {
2✔
1489
                                base.InfofCtx(bh.loggingCtx, base.KeySync, "Peer sent prove attachment error %v, falling back to getAttachment for proof in doc %s (digest %s)", proveAttErr, base.UD(docID), digest)
1✔
1490
                                _, getAttErr := bh.sendGetAttachment(sender, docID, name, digest, meta)
1✔
1491
                                if getAttErr == nil {
2✔
1492
                                        // Peer proved they have matching attachment. Keep existing attachment
1✔
1493
                                        return nil, nil
1✔
1494
                                }
1✔
1495
                                return nil, getAttErr
×
1496
                        }
1497

1498
                        return nil, proveAttErr
×
1499
                })
1500
}
1501

1502
func (bsc *BlipSyncContext) incrementSerialNumber() uint64 {
1✔
1503
        return atomic.AddUint64(&bsc.handlerSerialNumber, 1)
1✔
1504
}
1✔
1505

1506
func (bsc *BlipSyncContext) addAllowedAttachments(docID string, docVersion string, attMeta []AttachmentStorageMeta, activeSubprotocol CBMobileSubprotocolVersion) {
1✔
1507
        if len(attMeta) == 0 {
2✔
1508
                return
1✔
1509
        }
1✔
1510

1511
        bsc.allowedAttachmentsLock.Lock()
1✔
1512
        defer bsc.allowedAttachmentsLock.Unlock()
1✔
1513

1✔
1514
        if bsc.allowedAttachments == nil {
2✔
1515
                bsc.allowedAttachments = make(map[string]AllowedAttachment, 100)
1✔
1516
        }
1✔
1517
        for _, attachment := range attMeta {
2✔
1518
                key := allowedAttachmentKey(docID, attachment.digest, activeSubprotocol)
1✔
1519
                att, found := bsc.allowedAttachments[key]
1✔
1520
                if found {
2✔
1521
                        att.counter++
1✔
1522
                        bsc.allowedAttachments[key] = att
1✔
1523
                } else {
2✔
1524
                        bsc.allowedAttachments[key] = AllowedAttachment{
1✔
1525
                                version:    attachment.version,
1✔
1526
                                counter:    1,
1✔
1527
                                docID:      docID,
1✔
1528
                                docVersion: docVersion,
1✔
1529
                                name:       attachment.name,
1✔
1530
                        }
1✔
1531
                }
1✔
1532
        }
1533

1534
        base.TracefCtx(bsc.loggingCtx, base.KeySync, "addAllowedAttachments, added: %v current set: %v", attMeta, bsc.allowedAttachments)
1✔
1535
}
1536

1537
func (bsc *BlipSyncContext) removeAllowedAttachments(docID string, attMeta []AttachmentStorageMeta, activeSubprotocol CBMobileSubprotocolVersion) {
1✔
1538
        if len(attMeta) == 0 {
2✔
1539
                return
1✔
1540
        }
1✔
1541

1542
        bsc.allowedAttachmentsLock.Lock()
1✔
1543
        defer bsc.allowedAttachmentsLock.Unlock()
1✔
1544

1✔
1545
        for _, attachment := range attMeta {
2✔
1546
                key := allowedAttachmentKey(docID, attachment.digest, activeSubprotocol)
1✔
1547
                att, found := bsc.allowedAttachments[key]
1✔
1548
                if found {
2✔
1549
                        if n := att.counter; n > 1 {
2✔
1550
                                att.counter = n - 1
1✔
1551
                                bsc.allowedAttachments[key] = att
1✔
1552
                        } else {
2✔
1553
                                delete(bsc.allowedAttachments, key)
1✔
1554
                        }
1✔
1555
                }
1556
        }
1557

1558
        base.TracefCtx(bsc.loggingCtx, base.KeySync, "removeAllowedAttachments, removed: %v current set: %v", attMeta, bsc.allowedAttachments)
1✔
1559
}
1560

1561
func allowedAttachmentKey(docID, digest string, activeCBMobileSubprotocol CBMobileSubprotocolVersion) string {
1✔
1562
        if activeCBMobileSubprotocol >= CBMobileReplicationV3 {
2✔
1563
                return docID + digest
1✔
1564
        }
1✔
1565
        return digest
1✔
1566
}
1567

1568
func (bh *blipHandler) logEndpointEntry(profile, endpoint string) {
1✔
1569
        base.InfofCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s %s", bh.serialNumber, profile, endpoint)
1✔
1570
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc