• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 21494088809

29 Jan 2026 08:44PM UTC coverage: 44.734% (+0.02%) from 44.716%
21494088809

push

github

web-flow
core/synccache: limit batch size (#6073)

## Summary
Limit the batch size for the sync cache. Pebble has a fundamental 4GB
limitation on the size of a batch of changes. With these changes if we
exceed 128MB, we will go ahead and commit the batch and create a new
one.

## Related issues
#6072 


## Checklist

- [x] reference any related issues
- [x] updated unit tests
- [x] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [x] ready for review

13 of 27 new or added lines in 1 file covered. (48.15%)

13 existing lines in 3 files now uncovered.

30948 of 69183 relevant lines covered (44.73%)

102.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.12
/pkg/synccache/sync_cache.go
1
package synccache
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "iter"
9
        "slices"
10

11
        pebble "github.com/cockroachdb/pebble/v2"
12
        "github.com/google/uuid"
13
        "google.golang.org/grpc/codes"
14
        status "google.golang.org/grpc/status"
15
        "google.golang.org/protobuf/proto"
16
        "google.golang.org/protobuf/types/known/wrapperspb"
17

18
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
19
        "github.com/pomerium/pomerium/pkg/pebbleutil"
20
)
21

22
const (
23
        fieldServerVersion byte = 1
24
        fieldRecordVersion byte = 2
25
        fieldRecord        byte = 3
26

27
        // if a batch exceeds 4GB pebble panics, so just commit every 128MB
28
        maxBatchSize = 1024 * 1024 * 128
29
)
30

31
var (
32
        syncCacheUUIDNamespace = uuid.MustParse("c9acb8d4-f10a-4e3c-9308-c285e1ebfb58")
33
        marshalOptions         = &proto.MarshalOptions{AllowPartial: true, Deterministic: true}
34
        unmarshalOptions       = &proto.UnmarshalOptions{AllowPartial: true}
35
)
36

37
// A SyncCache uses the databroker Sync and SyncLatest methods to populate a cache of records.
38
// To use the SyncCache call Sync followed by Records.
39
//
40
// Data is stored in a pebble database in this format:
41
//
42
//        {{prefix}}{{uuidv5(recordType)}}0x01: the server version
43
//        {{prefix}}{{uuidv5(recordType)}}0x02: the latest record version
44
//        {{prefix}}{{uuidv5(recordType)}}0x03{{recordID}}: a databroker record
45
//
46
// Values are protobuf encoded.
47
type SyncCache interface {
48
        // Clear deletes all the data for the given record type in the sync cache.
49
        Clear(recordType string) error
50
        // Records yields the databroker records stored in the cache.
51
        Records(recordType string) iter.Seq2[*databroker.Record, error]
52
        // Sync syncs the cache with the databroker.
53
        Sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error
54
}
55

56
type syncCache struct {
57
        db     *pebble.DB
58
        prefix []byte
59

60
        iterOptions  *pebble.IterOptions
61
        writeOptions *pebble.WriteOptions
62
}
63

64
// New creates a new SyncCache.
65
func New(db *pebble.DB, prefix []byte) SyncCache {
1✔
66
        return &syncCache{
1✔
67
                db:     db,
1✔
68
                prefix: prefix,
1✔
69
        }
1✔
70
}
1✔
71

72
func (c *syncCache) Clear(recordType string) error {
1✔
73
        // delete all the existing data
1✔
74
        err := c.pebbleDeletePrefix(c.db, c.recordTypePrefix(recordType))
1✔
75
        if err != nil {
1✔
76
                return fmt.Errorf("sync-cache: error clearing data from cache (record-type=%s): %w", recordType, err)
×
77
        }
×
78

79
        return nil
1✔
80
}
81

82
func (c *syncCache) Records(recordType string) iter.Seq2[*databroker.Record, error] {
4✔
83
        prefix := c.recordPrefix(recordType)
4✔
84
        iterOptions := new(pebble.IterOptions)
4✔
85
        if c.iterOptions != nil {
4✔
86
                *iterOptions = *c.iterOptions
×
87
        }
×
88
        iterOptions.LowerBound = prefix
4✔
89
        iterOptions.UpperBound = pebbleutil.PrefixToUpperBound(prefix)
4✔
90
        return func(yield func(*databroker.Record, error) bool) {
8✔
91
                for record, err := range pebbleutil.Iterate(c.db, iterOptions, pebbleIteratorToRecord) {
14✔
92
                        if err != nil {
10✔
93
                                yield(nil, fmt.Errorf("sync-cache: error iterating over cached records (record-type=%s): %w", recordType, err))
×
94
                                return
×
95
                        }
×
96

97
                        if !yield(record, nil) {
10✔
98
                                return
×
99
                        }
×
100
                }
101
        }
102
}
103

104
func (c *syncCache) Sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error {
4✔
105
        serverVersion, recordVersion := wrapperspb.UInt64(0), wrapperspb.UInt64(0)
4✔
106
        err := errors.Join(
4✔
107
                c.pebbleGetProto(c.db, c.serverVersionKey(recordType), serverVersion),
4✔
108
                c.pebbleGetProto(c.db, c.recordVersionKey(recordType), recordVersion),
4✔
109
        )
4✔
110
        if errors.Is(err, pebble.ErrNotFound) {
5✔
111
                // if we've never synced, use sync latest
1✔
112
                return c.syncLatest(ctx, client, recordType)
1✔
113
        } else if err != nil {
4✔
114
                return fmt.Errorf("sync-cache: error retrieving server and record version from cache (record-type=%s): %w", recordType, err)
×
115
        }
×
116

117
        return c.sync(ctx, client, recordType, serverVersion.Value, recordVersion.Value)
3✔
118
}
119

120
func (c *syncCache) commitAndRecreateBatch(batch **pebble.Batch) error {
4✔
121
        err := (*batch).Commit(c.writeOptions)
4✔
122
        if err != nil {
4✔
NEW
123
                return fmt.Errorf("sync-cache: error committing changes to cache: %w", err)
×
NEW
124
        }
×
125
        err = (*batch).Close()
4✔
126
        if err != nil {
4✔
NEW
127
                return fmt.Errorf("sync-cache: error closing batch: %w", err)
×
NEW
128
        }
×
129
        *batch = c.db.NewBatch()
4✔
130
        return nil
4✔
131
}
132

133
func (c *syncCache) recordKey(recordType, recordID string) []byte {
5✔
134
        return slices.Concat(c.recordPrefix(recordType), []byte(recordID))
5✔
135
}
5✔
136

137
func (c *syncCache) recordPrefix(recordType string) []byte {
9✔
138
        return append(c.recordTypePrefix(recordType), fieldRecord)
9✔
139
}
9✔
140

141
func (c *syncCache) recordTypePrefix(recordType string) []byte {
26✔
142
        id := uuid.NewSHA1(syncCacheUUIDNamespace, []byte(recordType))
26✔
143
        return slices.Concat(c.prefix, id[:])
26✔
144
}
26✔
145

146
func (c *syncCache) recordVersionKey(recordType string) []byte {
8✔
147
        return append(c.recordTypePrefix(recordType), fieldRecordVersion)
8✔
148
}
8✔
149

150
func (c *syncCache) serverVersionKey(recordType string) []byte {
6✔
151
        return append(c.recordTypePrefix(recordType), fieldServerVersion)
6✔
152
}
6✔
153

154
func (c *syncCache) sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string, serverVersion, recordVersion uint64) error {
3✔
155
        streamCtx, cancel := context.WithCancel(ctx)
3✔
156
        defer cancel()
3✔
157

3✔
158
        stream, err := client.Sync(streamCtx, &databroker.SyncRequest{
3✔
159
                Type:          recordType,
3✔
160
                ServerVersion: serverVersion,
3✔
161
                RecordVersion: recordVersion,
3✔
162
                Wait:          proto.Bool(false),
3✔
163
        })
3✔
164
        if err != nil {
3✔
165
                return fmt.Errorf("sync-cache: error starting sync stream (record-type=%s): %w", recordType, err)
×
166
        }
×
167

168
        // batch the updates together
169
        batch := c.db.NewBatch()
3✔
170
        defer func() { _ = batch.Close() }()
6✔
171

172
        for {
8✔
173
                res, err := stream.Recv()
5✔
174
                if errors.Is(err, io.EOF) {
7✔
175
                        break
2✔
176
                } else if status.Code(err) == codes.Aborted {
4✔
177
                        cancel()
1✔
178
                        // the server version changed, so use sync latest
1✔
179
                        return c.syncLatest(ctx, client, recordType)
1✔
180
                } else if err != nil {
3✔
181
                        return fmt.Errorf("sync-cache: error receiving message from sync stream (record-type=%s): %w", recordType, err)
×
182
                }
×
183

184
                switch res := res.Response.(type) {
2✔
185
                case *databroker.SyncResponse_Record:
2✔
186
                        // either delete or update the record
2✔
187
                        if res.Record.DeletedAt != nil {
3✔
188
                                err = c.pebbleDelete(batch, c.recordKey(recordType, res.Record.Id))
1✔
189
                        } else {
2✔
190
                                err = c.pebbleSetProto(batch, c.recordKey(recordType, res.Record.Id), res.Record)
1✔
191
                        }
1✔
192
                        if err != nil {
2✔
193
                                return fmt.Errorf("sync-cache: error updating record in cache (record-type=%s): %w", recordType, err)
×
194
                        }
×
195

196
                        // update the record version
197
                        recordVersion = max(recordVersion, res.Record.Version)
2✔
198
                        err = c.pebbleSetProto(batch, c.recordVersionKey(recordType), wrapperspb.UInt64(recordVersion))
2✔
199
                        if err != nil {
2✔
200
                                return fmt.Errorf("sync-cache: error updating record version in cache (record-type=%s): %w", recordType, err)
×
201
                        }
×
202
                }
203

204
                if batch.Len() > maxBatchSize {
2✔
NEW
205
                        err = c.commitAndRecreateBatch(&batch)
×
NEW
206
                        if err != nil {
×
NEW
207
                                return err
×
NEW
208
                        }
×
209
                }
210
        }
211

212
        err = c.commitAndRecreateBatch(&batch)
2✔
213
        if err != nil {
2✔
NEW
214
                return err
×
215
        }
×
216

217
        return nil
2✔
218
}
219

220
func (c *syncCache) syncLatest(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error {
2✔
221
        ctx, cancel := context.WithCancel(ctx)
2✔
222
        defer cancel()
2✔
223

2✔
224
        stream, err := client.SyncLatest(ctx, &databroker.SyncLatestRequest{
2✔
225
                Type: recordType,
2✔
226
        })
2✔
227
        if err != nil {
2✔
228
                return fmt.Errorf("sync-cache: error starting sync latest stream (record-type=%s): %w", recordType, err)
×
229
        }
×
230

231
        // batch the updates together
232
        batch := c.db.NewBatch()
2✔
233
        defer func() { _ = batch.Close() }()
4✔
234

235
        // delete all the existing data
236
        err = c.pebbleDeletePrefix(batch, c.recordTypePrefix(recordType))
2✔
237
        if err != nil {
2✔
238
                return fmt.Errorf("sync-cache: error deleting existing data from cache (record-type=%s): %w", recordType, err)
×
239
        }
×
240

241
        for {
9✔
242
                res, err := stream.Recv()
7✔
243
                if errors.Is(err, io.EOF) {
9✔
244
                        break
2✔
245
                } else if err != nil {
5✔
246
                        return fmt.Errorf("sync-cache: error receiving message from sync latest stream (record-type=%s): %w", recordType, err)
×
247
                }
×
248

249
                switch res := res.Response.(type) {
5✔
250
                case *databroker.SyncLatestResponse_Record:
3✔
251
                        // add the record
3✔
252
                        err = c.pebbleSetProto(batch, c.recordKey(recordType, res.Record.Id), res.Record)
3✔
253
                        if err != nil {
3✔
254
                                return fmt.Errorf("sync-cache: error saving record to cache (record-type=%s): %w", recordType, err)
×
255
                        }
×
256
                case *databroker.SyncLatestResponse_Versions:
2✔
257
                        // update the versions
2✔
258
                        err = errors.Join(
2✔
259
                                c.pebbleSetProto(batch, c.serverVersionKey(recordType), wrapperspb.UInt64(res.Versions.ServerVersion)),
2✔
260
                                c.pebbleSetProto(batch, c.recordVersionKey(recordType), wrapperspb.UInt64(res.Versions.LatestRecordVersion)),
2✔
261
                        )
2✔
262
                        if err != nil {
2✔
263
                                return fmt.Errorf("sync-cache: error saving versions to cache (record-type=%s): %w", recordType, err)
×
264
                        }
×
265
                }
266

267
                if batch.Len() > maxBatchSize {
5✔
NEW
268
                        err = c.commitAndRecreateBatch(&batch)
×
NEW
269
                        if err != nil {
×
NEW
270
                                return err
×
NEW
271
                        }
×
272
                }
273
        }
274

275
        err = c.commitAndRecreateBatch(&batch)
2✔
276
        if err != nil {
2✔
NEW
277
                return err
×
278
        }
×
279

280
        return nil
2✔
281
}
282

283
func (c *syncCache) pebbleDelete(dst pebble.Writer, key []byte) error {
1✔
284
        return dst.Delete(key, c.writeOptions)
1✔
285
}
1✔
286

287
func (c *syncCache) pebbleDeletePrefix(dst pebble.Writer, prefix []byte) error {
3✔
288
        return dst.DeleteRange(prefix, pebbleutil.PrefixToUpperBound(prefix), c.writeOptions)
3✔
289
}
3✔
290

291
func (c *syncCache) pebbleGetProto(src pebble.Reader, key []byte, msg proto.Message) error {
8✔
292
        value, closer, err := src.Get(key)
8✔
293
        if err != nil {
10✔
294
                return err
2✔
295
        }
2✔
296
        err = unmarshalOptions.Unmarshal(value, msg)
6✔
297
        _ = closer.Close()
6✔
298
        return err
6✔
299
}
300

301
func (c *syncCache) pebbleSet(dst pebble.Writer, key, value []byte) error {
10✔
302
        return dst.Set(key, value, c.writeOptions)
10✔
303
}
10✔
304

305
func (c *syncCache) pebbleSetProto(dst pebble.Writer, key []byte, msg proto.Message) error {
10✔
306
        value, err := marshalOptions.Marshal(msg)
10✔
307
        if err != nil {
10✔
308
                return err
×
309
        }
×
310
        return c.pebbleSet(dst, key, value)
10✔
311
}
312

313
func pebbleIteratorToRecord(it *pebble.Iterator) (*databroker.Record, error) {
10✔
314
        value, err := it.ValueAndErr()
10✔
315
        if err != nil {
10✔
316
                return nil, err
×
317
        }
×
318

319
        record := new(databroker.Record)
10✔
320
        err = unmarshalOptions.Unmarshal(value, record)
10✔
321
        if err != nil {
10✔
322
                return nil, err
×
323
        }
×
324

325
        return record, nil
10✔
326
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc