• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 19677328788

25 Nov 2025 04:50PM UTC coverage: 53.843% (+0.04%) from 53.804%
19677328788

push

github

web-flow
feat/fix(databroker): add options to sync latest stream (#5947)

## Summary


The SyncLatest / Sync stream used to not sync databroker record options.
However with the introduction of databroker record indexing, which is
required for SSH authorization code flow, options should be synced
across replicas in clustered mode.

## Related issues


[ENG-3234](https://linear.app/pomerium/issue/ENG-3234/databroker-clustered-mode-does-not-sync-options)

## User Explanation

N/A

## Checklist

- [X] reference any related issues
- [x] updated unit tests
- [X] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [x] ready for review

212 of 339 new or added lines in 19 files covered. (62.54%)

19 existing lines in 8 files now uncovered.

28949 of 53766 relevant lines covered (53.84%)

91.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.68
/pkg/synccache/sync_cache.go
1
package synccache
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "iter"
9
        "slices"
10

11
        pebble "github.com/cockroachdb/pebble/v2"
12
        "github.com/google/uuid"
13
        "google.golang.org/grpc/codes"
14
        status "google.golang.org/grpc/status"
15
        "google.golang.org/protobuf/proto"
16
        "google.golang.org/protobuf/types/known/wrapperspb"
17

18
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
19
        "github.com/pomerium/pomerium/pkg/pebbleutil"
20
)
21

22
const (
23
        fieldServerVersion byte = 1
24
        fieldRecordVersion byte = 2
25
        fieldRecord        byte = 3
26
)
27

28
var (
29
        syncCacheUUIDNamespace = uuid.MustParse("c9acb8d4-f10a-4e3c-9308-c285e1ebfb58")
30
        marshalOptions         = &proto.MarshalOptions{AllowPartial: true, Deterministic: true}
31
        unmarshalOptions       = &proto.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
32
)
33

34
// A SyncCache uses the databroker Sync and SyncLatest methods to populate a cache of records.
35
// To use the SyncCache call Sync followed by Records.
36
//
37
// Data is stored in a pebble database in this format:
38
//
39
//        {{prefix}}{{uuidv5(recordType)}}0x01: the server version
40
//        {{prefix}}{{uuidv5(recordType)}}0x02: the latest record version
41
//        {{prefix}}{{uuidv5(recordType)}}0x03{{recordID}}: a databroker record
42
//
43
// Values are protobuf encoded.
44
type SyncCache interface {
45
        // Clear deletes all the data for the given record type in the sync cache.
46
        Clear(recordType string) error
47
        // Records yields the databroker records stored in the cache.
48
        Records(recordType string) iter.Seq2[*databroker.Record, error]
49
        // Sync syncs the cache with the databroker.
50
        Sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error
51
}
52

53
type syncCache struct {
54
        db     *pebble.DB
55
        prefix []byte
56

57
        iterOptions  *pebble.IterOptions
58
        writeOptions *pebble.WriteOptions
59
}
60

61
// New creates a new SyncCache.
62
func New(db *pebble.DB, prefix []byte) SyncCache {
1✔
63
        return &syncCache{
1✔
64
                db:     db,
1✔
65
                prefix: prefix,
1✔
66
        }
1✔
67
}
1✔
68

69
func (c *syncCache) Clear(recordType string) error {
1✔
70
        // delete all the existing data
1✔
71
        err := c.pebbleDeletePrefix(c.db, c.recordTypePrefix(recordType))
1✔
72
        if err != nil {
1✔
73
                return fmt.Errorf("sync-cache: error clearing data from cache (record-type=%s): %w", recordType, err)
×
74
        }
×
75

76
        return nil
1✔
77
}
78

79
func (c *syncCache) Records(recordType string) iter.Seq2[*databroker.Record, error] {
4✔
80
        prefix := c.recordPrefix(recordType)
4✔
81
        iterOptions := new(pebble.IterOptions)
4✔
82
        if c.iterOptions != nil {
4✔
83
                *iterOptions = *c.iterOptions
×
84
        }
×
85
        iterOptions.LowerBound = prefix
4✔
86
        iterOptions.UpperBound = pebbleutil.PrefixToUpperBound(prefix)
4✔
87
        return func(yield func(*databroker.Record, error) bool) {
8✔
88
                for record, err := range pebbleutil.Iterate(c.db, iterOptions, pebbleIteratorToRecord) {
14✔
89
                        if err != nil {
10✔
90
                                yield(nil, fmt.Errorf("sync-cache: error iterating over cached records (record-type=%s): %w", recordType, err))
×
91
                                return
×
92
                        }
×
93

94
                        if !yield(record, nil) {
10✔
95
                                return
×
96
                        }
×
97
                }
98
        }
99
}
100

101
func (c *syncCache) Sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error {
4✔
102
        serverVersion, recordVersion := wrapperspb.UInt64(0), wrapperspb.UInt64(0)
4✔
103
        err := errors.Join(
4✔
104
                c.pebbleGetProto(c.db, c.serverVersionKey(recordType), serverVersion),
4✔
105
                c.pebbleGetProto(c.db, c.recordVersionKey(recordType), recordVersion),
4✔
106
        )
4✔
107
        if errors.Is(err, pebble.ErrNotFound) {
5✔
108
                // if we've never synced, use sync latest
1✔
109
                return c.syncLatest(ctx, client, recordType)
1✔
110
        } else if err != nil {
4✔
111
                return fmt.Errorf("sync-cache: error retrieving server and record version from cache (record-type=%s): %w", recordType, err)
×
112
        }
×
113

114
        return c.sync(ctx, client, recordType, serverVersion.Value, recordVersion.Value)
3✔
115
}
116

117
func (c *syncCache) recordKey(recordType, recordID string) []byte {
5✔
118
        return slices.Concat(c.recordPrefix(recordType), []byte(recordID))
5✔
119
}
5✔
120

121
func (c *syncCache) recordPrefix(recordType string) []byte {
9✔
122
        return append(c.recordTypePrefix(recordType), fieldRecord)
9✔
123
}
9✔
124

125
func (c *syncCache) recordTypePrefix(recordType string) []byte {
26✔
126
        id := uuid.NewSHA1(syncCacheUUIDNamespace, []byte(recordType))
26✔
127
        return slices.Concat(c.prefix, id[:])
26✔
128
}
26✔
129

130
func (c *syncCache) recordVersionKey(recordType string) []byte {
8✔
131
        return append(c.recordTypePrefix(recordType), fieldRecordVersion)
8✔
132
}
8✔
133

134
func (c *syncCache) serverVersionKey(recordType string) []byte {
6✔
135
        return append(c.recordTypePrefix(recordType), fieldServerVersion)
6✔
136
}
6✔
137

138
func (c *syncCache) sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string, serverVersion, recordVersion uint64) error {
3✔
139
        streamCtx, cancel := context.WithCancel(ctx)
3✔
140
        defer cancel()
3✔
141

3✔
142
        stream, err := client.Sync(streamCtx, &databroker.SyncRequest{
3✔
143
                Type:          recordType,
3✔
144
                ServerVersion: serverVersion,
3✔
145
                RecordVersion: recordVersion,
3✔
146
                Wait:          proto.Bool(false),
3✔
147
        })
3✔
148
        if err != nil {
3✔
149
                return fmt.Errorf("sync-cache: error starting sync stream (record-type=%s): %w", recordType, err)
×
150
        }
×
151

152
        // batch the updates together
153
        batch := c.db.NewBatch()
3✔
154
        defer batch.Close()
3✔
155

3✔
156
        for {
8✔
157
                res, err := stream.Recv()
5✔
158
                if errors.Is(err, io.EOF) {
7✔
159
                        break
2✔
160
                } else if status.Code(err) == codes.Aborted {
4✔
161
                        cancel()
1✔
162
                        // the server version changed, so use sync latest
1✔
163
                        return c.syncLatest(ctx, client, recordType)
1✔
164
                } else if err != nil {
3✔
165
                        return fmt.Errorf("sync-cache: error receiving message from sync stream (record-type=%s): %w", recordType, err)
×
166
                }
×
167

168
                switch res := res.Response.(type) {
2✔
169

170
                case *databroker.SyncResponse_Record:
2✔
171
                        // either delete or update the record
2✔
172
                        if res.Record.DeletedAt != nil {
3✔
173
                                err = c.pebbleDelete(batch, c.recordKey(recordType, res.Record.Id))
1✔
174
                        } else {
2✔
175
                                err = c.pebbleSetProto(batch, c.recordKey(recordType, res.Record.Id), res.Record)
1✔
176
                        }
1✔
177
                        if err != nil {
2✔
NEW
178
                                return fmt.Errorf("sync-cache: error updating record in cache (record-type=%s): %w", recordType, err)
×
NEW
179
                        }
×
180

181
                        // update the record version
182
                        recordVersion = max(recordVersion, res.Record.Version)
2✔
183
                        err = c.pebbleSetProto(batch, c.recordVersionKey(recordType), wrapperspb.UInt64(recordVersion))
2✔
184
                        if err != nil {
2✔
NEW
185
                                return fmt.Errorf("sync-cache: error updating record version in cache (record-type=%s): %w", recordType, err)
×
NEW
186
                        }
×
NEW
187
                case *databroker.SyncResponse_Options:
×
NEW
188
                default:
×
NEW
189
                        panic(fmt.Sprintf("unexpected response: %T", res))
×
190
                }
191
        }
192

193
        err = batch.Commit(c.writeOptions)
2✔
194
        if err != nil {
2✔
195
                return fmt.Errorf("sync-cache: error committing changes to cache (record-type=%s): %w", recordType, err)
×
196
        }
×
197

198
        return nil
2✔
199
}
200

201
func (c *syncCache) syncLatest(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error {
2✔
202
        ctx, cancel := context.WithCancel(ctx)
2✔
203
        defer cancel()
2✔
204

2✔
205
        stream, err := client.SyncLatest(ctx, &databroker.SyncLatestRequest{
2✔
206
                Type: recordType,
2✔
207
        })
2✔
208
        if err != nil {
2✔
209
                return fmt.Errorf("sync-cache: error starting sync latest stream (record-type=%s): %w", recordType, err)
×
210
        }
×
211

212
        // batch the updates together
213
        batch := c.db.NewBatch()
2✔
214
        defer batch.Close()
2✔
215

2✔
216
        // delete all the existing data
2✔
217
        err = c.pebbleDeletePrefix(batch, c.recordTypePrefix(recordType))
2✔
218
        if err != nil {
2✔
219
                return fmt.Errorf("sync-cache: error deleting existing data from cache (record-type=%s): %w", recordType, err)
×
220
        }
×
221

222
        for {
9✔
223
                res, err := stream.Recv()
7✔
224
                if errors.Is(err, io.EOF) {
9✔
225
                        break
2✔
226
                } else if err != nil {
5✔
227
                        return fmt.Errorf("sync-cache: error receiving message from sync latest stream (record-type=%s): %w", recordType, err)
×
228
                }
×
229

230
                switch res := res.Response.(type) {
5✔
231
                case *databroker.SyncLatestResponse_Record:
3✔
232
                        // add the record
3✔
233
                        err = c.pebbleSetProto(batch, c.recordKey(recordType, res.Record.Id), res.Record)
3✔
234
                        if err != nil {
3✔
235
                                return fmt.Errorf("sync-cache: error saving record to cache (record-type=%s): %w", recordType, err)
×
236
                        }
×
237
                case *databroker.SyncLatestResponse_Versions:
2✔
238
                        // update the versions
2✔
239
                        err = errors.Join(
2✔
240
                                c.pebbleSetProto(batch, c.serverVersionKey(recordType), wrapperspb.UInt64(res.Versions.ServerVersion)),
2✔
241
                                c.pebbleSetProto(batch, c.recordVersionKey(recordType), wrapperspb.UInt64(res.Versions.LatestRecordVersion)),
2✔
242
                        )
2✔
243
                        if err != nil {
2✔
244
                                return fmt.Errorf("sync-cache: error saving versions to cache (record-type=%s): %w", recordType, err)
×
245
                        }
×
NEW
246
                case *databroker.SyncLatestResponse_Options:
×
247
                default:
×
248
                        return fmt.Errorf("sync-cache: unknown message type from sync latest stream (record-type=%s): %T", recordType, res)
×
249
                }
250
        }
251

252
        err = batch.Commit(c.writeOptions)
2✔
253
        if err != nil {
2✔
254
                return fmt.Errorf("sync-cache: error committing changes to cache (record-type=%s): %w", recordType, err)
×
255
        }
×
256

257
        return nil
2✔
258
}
259

260
func (c *syncCache) pebbleDelete(dst pebble.Writer, key []byte) error {
1✔
261
        return dst.Delete(key, c.writeOptions)
1✔
262
}
1✔
263

264
func (c *syncCache) pebbleDeletePrefix(dst pebble.Writer, prefix []byte) error {
3✔
265
        return dst.DeleteRange(prefix, pebbleutil.PrefixToUpperBound(prefix), c.writeOptions)
3✔
266
}
3✔
267

268
func (c *syncCache) pebbleGetProto(src pebble.Reader, key []byte, msg proto.Message) error {
8✔
269
        value, closer, err := src.Get(key)
8✔
270
        if err != nil {
10✔
271
                return err
2✔
272
        }
2✔
273
        err = unmarshalOptions.Unmarshal(value, msg)
6✔
274
        _ = closer.Close()
6✔
275
        return err
6✔
276
}
277

278
func (c *syncCache) pebbleSet(dst pebble.Writer, key, value []byte) error {
10✔
279
        return dst.Set(key, value, c.writeOptions)
10✔
280
}
10✔
281

282
func (c *syncCache) pebbleSetProto(dst pebble.Writer, key []byte, msg proto.Message) error {
10✔
283
        value, err := marshalOptions.Marshal(msg)
10✔
284
        if err != nil {
10✔
285
                return err
×
286
        }
×
287
        return c.pebbleSet(dst, key, value)
10✔
288
}
289

290
func pebbleIteratorToRecord(it *pebble.Iterator) (*databroker.Record, error) {
10✔
291
        value, err := it.ValueAndErr()
10✔
292
        if err != nil {
10✔
293
                return nil, err
×
294
        }
×
295

296
        record := new(databroker.Record)
10✔
297
        err = unmarshalOptions.Unmarshal(value, record)
10✔
298
        if err != nil {
10✔
299
                return nil, err
×
300
        }
×
301

302
        return record, nil
10✔
303
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc