• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 18723713565

22 Oct 2025 04:56PM UTC coverage: 53.999% (-0.002%) from 54.001%
18723713565

push

github

web-flow
config: add resolvers to dns options (#5898)

## Summary

Adds `dns_resolvers` core config option, that allows to provide a list
of TCP or UDP DNS resolvers as a list of IP-only URLs like
`udp://1.1.1.1:53`

## Related issues

Ref: 
 - https://linear.app/pomerium/issue/ENG-3067

## User Explanation

<!-- How would you explain this change to the user? If this
change doesn't create any user-facing changes, you can leave
this blank. If filled out, add the `docs` label -->

## Checklist

- [ ] reference any related issues
- [ ] updated unit tests
- [ ] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [ ] ready for review

58 of 67 new or added lines in 2 files covered. (86.57%)

22 existing lines in 8 files now uncovered.

27459 of 50851 relevant lines covered (54.0%)

83.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.14
/pkg/storage/postgres/postgres.go
1
// Package postgres contains an implementation of the storage.Backend backed by postgres.
2
package postgres
3

4
import (
5
        "context"
6
        "errors"
7
        "fmt"
8
        "sort"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "github.com/jackc/pgx/v5"
14
        "github.com/jackc/pgx/v5/pgconn"
15
        "github.com/jackc/pgx/v5/pgtype"
16
        "github.com/jackc/pgx/v5/pgxpool"
17
        "google.golang.org/protobuf/encoding/protojson"
18
        "google.golang.org/protobuf/proto"
19
        "google.golang.org/protobuf/reflect/protoregistry"
20
        "google.golang.org/protobuf/types/known/anypb"
21
        "google.golang.org/protobuf/types/known/fieldmaskpb"
22
        "google.golang.org/protobuf/types/known/timestamppb"
23

24
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
25
        "github.com/pomerium/pomerium/pkg/grpc/registry"
26
        "github.com/pomerium/pomerium/pkg/protoutil"
27
        "github.com/pomerium/pomerium/pkg/storage"
28
)
29

30
const (
31
        recordBatchSize   = 64
32
        watchPollInterval = 30 * time.Second
33
)
34

35
var (
36
        schemaName              = "pomerium"
37
        migrationInfoTableName  = "migration_info"
38
        recordsTableName        = "records"
39
        recordChangesTableName  = "record_changes"
40
        recordChangeNotifyName  = "pomerium_record_change"
41
        recordOptionsTableName  = "record_options"
42
        leasesTableName         = "leases"
43
        serviceChangeNotifyName = "pomerium_service_change"
44
        servicesTableName       = "services"
45
        checkpointsTableName    = "checkpoints"
46
)
47

48
type querier interface {
49
        Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
50
        Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
51
        QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
52
}
53

54
func clearRecords(ctx context.Context, q querier, newServerVersion uint64) error {
1✔
55
        _, err := q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordChangesTableName)
1✔
56
        if err != nil {
1✔
57
                return err
×
58
        }
×
59
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordsTableName)
1✔
60
        if err != nil {
1✔
61
                return err
×
62
        }
×
63
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordOptionsTableName)
1✔
64
        if err != nil {
1✔
65
                return err
×
66
        }
×
67
        _, err = q.Exec(ctx, `
1✔
68
                UPDATE `+schemaName+`.`+migrationInfoTableName+`
1✔
69
                SET server_version = $1
1✔
70
        `, newServerVersion)
1✔
71
        if err != nil {
1✔
72
                return err
×
73
        }
×
74

75
        _, err = q.Exec(ctx, `
1✔
76
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
77
                SET server_version = 0, record_version = 0
1✔
78
        `)
1✔
79
        if err != nil {
1✔
80
                return err
×
81
        }
×
82

83
        return nil
1✔
84
}
85

86
func deleteChangesBefore(ctx context.Context, q querier, cutoff time.Time) error {
1✔
87
        // we always want to keep at least one row in the changes table,
1✔
88
        // so we take the changes table, sort by version descending, skip the first row
1✔
89
        // and then find any changes before the cutoff
1✔
90
        _, err := q.Exec(ctx, `
1✔
91
                WITH t1 AS (
1✔
92
                        SELECT *
1✔
93
                        FROM `+schemaName+`.`+recordChangesTableName+`
1✔
94
                        ORDER BY version DESC
1✔
95
                        OFFSET 1
1✔
96
                        FOR UPDATE SKIP LOCKED
1✔
97
                ), t2 AS (
1✔
98
                        SELECT version
1✔
99
                        FROM t1
1✔
100
                        WHERE modified_at<$1
1✔
101
                        FOR UPDATE SKIP LOCKED
1✔
102
                )
1✔
103
                DELETE FROM `+schemaName+`.`+recordChangesTableName+` t3
1✔
104
                USING t2
1✔
105
                WHERE t2.version=t3.version
1✔
106
        `, cutoff)
1✔
107
        return err
1✔
108
}
1✔
109

110
func deleteExpiredServices(ctx context.Context, q querier, cutoff time.Time) (rowCount int64, err error) {
13✔
111
        cmd, err := q.Exec(ctx, `
13✔
112
                WITH t1 AS (
13✔
113
                        SELECT kind, endpoint
13✔
114
                        FROM `+schemaName+`.`+servicesTableName+`
13✔
115
                        WHERE expires_at<$1
13✔
116
                        FOR UPDATE SKIP LOCKED
13✔
117
                )
13✔
118
                DELETE FROM `+schemaName+`.`+servicesTableName+` t2
13✔
119
                USING t1
13✔
120
                WHERE t1.kind=t2.kind
13✔
121
                  AND t1.endpoint=t2.endpoint
13✔
122
        `, cutoff)
13✔
123
        if err != nil {
13✔
UNCOV
124
                return 0, err
×
UNCOV
125
        }
×
126
        return cmd.RowsAffected(), nil
13✔
127
}
128

129
func dup(record *databroker.Record) *databroker.Record {
1,354✔
130
        return proto.Clone(record).(*databroker.Record)
1,354✔
131
}
1,354✔
132

133
func enforceOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
131✔
134
        if options == nil || options.Capacity == nil {
251✔
135
                return nil
120✔
136
        }
120✔
137

138
        _, err := q.Exec(ctx, `
11✔
139
                DELETE FROM `+schemaName+`.`+recordsTableName+`
11✔
140
                WHERE type=$1
11✔
141
                  AND id NOT IN (
11✔
142
                        SELECT id
11✔
143
                        FROM `+schemaName+`.`+recordsTableName+`
11✔
144
                        WHERE type=$1
11✔
145
                        ORDER BY version DESC
11✔
146
                        LIMIT $2
11✔
147
                )
11✔
148
        `, recordType, options.GetCapacity())
11✔
149
        return err
11✔
150
}
151

152
func getRecordVersionRange(ctx context.Context, q querier) (earliestRecordVersion, latestRecordVersion uint64, err error) {
38✔
153
        err = q.QueryRow(ctx, `
38✔
154
                SELECT COALESCE(MIN(version), 0), COALESCE(MAX(version), 0)
38✔
155
                FROM `+schemaName+`.`+recordChangesTableName+`
38✔
156
        `).Scan(&earliestRecordVersion, &latestRecordVersion)
38✔
157
        return earliestRecordVersion, latestRecordVersion, err
38✔
158
}
38✔
159

160
func getCheckpoint(ctx context.Context, q querier) (serverVersion, recordVersion uint64, err error) {
1✔
161
        var sv, rv pgtype.Numeric
1✔
162
        err = q.QueryRow(ctx, `
1✔
163
                SELECT server_version, record_version
1✔
164
                FROM `+schemaName+`.`+checkpointsTableName+`
1✔
165
        `).Scan(&sv, &rv)
1✔
166
        return sv.Int.Uint64(), rv.Int.Uint64(), err
1✔
167
}
1✔
168

169
func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) {
132✔
170
        var capacity pgtype.Int8
132✔
171
        err := q.QueryRow(ctx, `
132✔
172
                SELECT capacity
132✔
173
                FROM `+schemaName+`.`+recordOptionsTableName+`
132✔
174
                WHERE type=$1
132✔
175
        `, recordType).Scan(&capacity)
132✔
176
        if err != nil && !isNotFound(err) {
132✔
177
                return nil, err
×
178
        }
×
179
        options := new(databroker.Options)
132✔
180
        if capacity.Valid {
143✔
181
                options.Capacity = proto.Uint64(uint64(capacity.Int64))
11✔
182
        }
11✔
183
        return options, nil
132✔
184
}
185

186
type lockMode string
187

188
const (
189
        lockModeNone   lockMode = ""
190
        lockModeUpdate lockMode = "FOR UPDATE"
191
)
192

193
func getRecord(
194
        ctx context.Context, q querier, recordType, recordID string, lockMode lockMode,
195
) (*databroker.Record, error) {
1,313✔
196
        var version uint64
1,313✔
197
        var data []byte
1,313✔
198
        var modifiedAt pgtype.Timestamptz
1,313✔
199
        err := q.QueryRow(ctx, `
1,313✔
200
                SELECT version, data, modified_at
1,313✔
201
                  FROM `+schemaName+`.`+recordsTableName+`
1,313✔
202
                 WHERE type=$1 AND id=$2 `+string(lockMode),
1,313✔
203
                recordType, recordID).Scan(&version, &data, &modifiedAt)
1,313✔
204
        if isNotFound(err) {
2,317✔
205
                return nil, storage.ErrNotFound
1,004✔
206
        } else if err != nil {
1,313✔
207
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
208
        }
×
209

210
        a, err := protoutil.UnmarshalAnyJSON(data)
309✔
211
        if isUnknownType(err) {
310✔
212
                return nil, storage.ErrNotFound
1✔
213
        } else if err != nil {
309✔
214
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
215
        }
×
216

217
        return &databroker.Record{
308✔
218
                Version:    version,
308✔
219
                Type:       recordType,
308✔
220
                Id:         recordID,
308✔
221
                Data:       a,
308✔
222
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
308✔
223
        }, nil
308✔
224
}
225

226
func listChangedRecordsAfter(ctx context.Context, q querier, recordType string, lastRecordVersion uint64) ([]*databroker.Record, error) {
57✔
227
        args := []any{lastRecordVersion, recordBatchSize}
57✔
228
        query := `
57✔
229
                SELECT type, id, version, data, modified_at, deleted_at
57✔
230
                FROM ` + schemaName + `.` + recordChangesTableName + `
57✔
231
                WHERE version>$1
57✔
232
        `
57✔
233
        if recordType != "" {
91✔
234
                args = append(args, recordType)
34✔
235
                query += ` AND type=$3`
34✔
236
        }
34✔
237
        query += `
57✔
238
                ORDER BY version
57✔
239
                LIMIT $2
57✔
240
        `
57✔
241
        rows, err := q.Query(ctx, query, args...)
57✔
242
        if err != nil {
57✔
243
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
244
        }
×
245
        return pgx.CollectRows(rows, collectRecord)
57✔
246
}
247

248
func listLatestRecordsAfter(ctx context.Context, q querier, expr storage.FilterExpression, lastRecordType, lastRecordID string) ([]*databroker.Record, error) {
29✔
249
        args := []any{lastRecordType, lastRecordID, recordBatchSize}
29✔
250
        query := `
29✔
251
                SELECT type, id, version, data, modified_at, NULL::timestamptz
29✔
252
                FROM ` + schemaName + `.` + recordsTableName + `
29✔
253
                WHERE ((type>$1) OR (type=$1 AND id>$2))
29✔
254
        `
29✔
255
        if expr != nil {
48✔
256
                query += "AND "
19✔
257
                err := addFilterExpressionToQuery(&query, &args, expr)
19✔
258
                if err != nil {
19✔
259
                        return nil, fmt.Errorf("postgres: failed to add filter to query: %w", err)
×
260
                }
×
261
        }
262
        query += `
29✔
263
                ORDER BY type, id
29✔
264
                LIMIT $3
29✔
265
        `
29✔
266
        rows, err := q.Query(ctx, query, args...)
29✔
267
        if err != nil {
29✔
268
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
269
        }
×
270
        return pgx.CollectRows(rows, collectRecord)
29✔
271
}
272

273
func listServices(ctx context.Context, q querier) ([]*registry.Service, error) {
2✔
274
        var services []*registry.Service
2✔
275

2✔
276
        query := `
2✔
277
                SELECT kind, endpoint
2✔
278
                FROM  ` + schemaName + `.` + servicesTableName + `
2✔
279
                ORDER BY kind, endpoint
2✔
280
        `
2✔
281
        rows, err := q.Query(ctx, query)
2✔
282
        if err != nil {
2✔
283
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
284
        }
×
285
        defer rows.Close()
2✔
286

2✔
287
        for rows.Next() {
5✔
288
                var kind, endpoint string
3✔
289
                err = rows.Scan(&kind, &endpoint)
3✔
290
                if err != nil {
3✔
291
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
292
                }
×
293

294
                services = append(services, &registry.Service{
3✔
295
                        Kind:     registry.ServiceKind(registry.ServiceKind_value[kind]),
3✔
296
                        Endpoint: endpoint,
3✔
297
                })
3✔
298
        }
299
        err = rows.Err()
2✔
300
        if err != nil {
2✔
301
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
302
        }
×
303

304
        return services, nil
2✔
305
}
306

307
func listTypes(ctx context.Context, q querier) ([]string, error) {
11✔
308
        query := `
11✔
309
                SELECT DISTINCT type
11✔
310
                FROM ` + schemaName + `.` + recordsTableName + `
11✔
311
        `
11✔
312
        rows, err := q.Query(ctx, query)
11✔
313
        if err != nil {
20✔
314
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
9✔
315
        }
9✔
316
        defer rows.Close()
2✔
317

2✔
318
        var types []string
2✔
319
        for rows.Next() {
6✔
320
                var recordType string
4✔
321
                err = rows.Scan(&recordType)
4✔
322
                if err != nil {
4✔
323
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
324
                }
×
325

326
                types = append(types, recordType)
4✔
327
        }
328
        err = rows.Err()
2✔
329
        if err != nil {
3✔
330
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
1✔
331
        }
1✔
332

333
        sort.Strings(types)
1✔
334
        return types, nil
1✔
335
}
336

337
func maybeAcquireLease(ctx context.Context, q querier, leaseName, leaseID string, ttl time.Duration) (leaseHolderID string, err error) {
14✔
338
        tbl := schemaName + "." + leasesTableName
14✔
339
        expiresAt := timestamptzFromTimestamppb(timestamppb.New(time.Now().Add(ttl)))
14✔
340
        now := timestamptzFromTimestamppb(timestamppb.Now())
14✔
341
        err = q.QueryRow(ctx, `
14✔
342
                INSERT INTO `+tbl+` (name, id, expires_at)
14✔
343
                VALUES ($1, $2, $3)
14✔
344
                ON CONFLICT (name) DO UPDATE
14✔
345
                SET id=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $2 ELSE `+tbl+`.id END,
14✔
346
                    expires_at=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $3 ELSE `+tbl+`.expires_at END
14✔
347
                RETURNING `+tbl+`.id
14✔
348
        `, leaseName, leaseID, expiresAt, now).Scan(&leaseHolderID)
14✔
349
        return leaseHolderID, err
14✔
350
}
14✔
351

352
func putRecordAndChange(ctx context.Context, q querier, record *databroker.Record) error {
1,352✔
353
        data, err := jsonbFromAny(record.GetData())
1,352✔
354
        if err != nil {
1,352✔
355
                return fmt.Errorf("postgres: failed to convert any to json: %w", err)
×
356
        }
×
357

358
        modifiedAt := timestamptzFromTimestamppb(record.GetModifiedAt())
1,352✔
359
        deletedAt := timestamptzFromTimestamppb(record.GetDeletedAt())
1,352✔
360
        indexCIDR := &pgtype.Text{Valid: false}
1,352✔
361
        if cidr := storage.GetRecordIndexCIDR(record.GetData()); cidr != nil {
1,352✔
362
                indexCIDR.String = cidr.String()
×
363
                indexCIDR.Valid = true
×
364
        }
×
365

366
        query := `
1,352✔
367
                WITH t1 AS (
1,352✔
368
                        INSERT INTO ` + schemaName + `.` + recordChangesTableName + ` (type, id, data, modified_at, deleted_at)
1,352✔
369
                        VALUES ($1, $2, $3, $4, $5)
1,352✔
370
                        RETURNING *
1,352✔
371
                )
1,352✔
372
        `
1,352✔
373
        args := []any{
1,352✔
374
                record.GetType(), record.GetId(), data, modifiedAt, deletedAt,
1,352✔
375
        }
1,352✔
376
        if record.GetDeletedAt() == nil {
2,703✔
377
                query += `
1,351✔
378
                        INSERT INTO ` + schemaName + `.` + recordsTableName + ` (type, id, version, data, modified_at, index_cidr)
1,351✔
379
                        VALUES ($1, $2, (SELECT version FROM t1), $3, $4, $6)
1,351✔
380
                        ON CONFLICT (type, id) DO UPDATE
1,351✔
381
                        SET version=(SELECT version FROM t1), data=$3, modified_at=$4, index_cidr=$6
1,351✔
382
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
1,351✔
383
                `
1,351✔
384
                args = append(args, indexCIDR)
1,351✔
385
        } else {
1,352✔
386
                query += `
1✔
387
                        DELETE FROM ` + schemaName + `.` + recordsTableName + `
1✔
388
                        WHERE type=$1 AND id=$2
1✔
389
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
1✔
390
                `
1✔
391
        }
1✔
392
        err = q.QueryRow(ctx, query, args...).Scan(&record.Version)
1,352✔
393
        if err != nil && !isNotFound(err) {
2,362✔
394
                return fmt.Errorf("postgres: failed to execute query: %w", err)
1,010✔
395
        }
1,010✔
396

397
        return nil
342✔
398
}
399

400
// patchRecord updates specific fields of an existing record.
401
func patchRecord(
402
        ctx context.Context, p *pgxpool.Pool, record *databroker.Record, fields *fieldmaskpb.FieldMask,
403
) error {
204✔
404
        tx, err := p.Begin(ctx)
204✔
405
        if err != nil {
204✔
406
                return err
×
407
        }
×
408
        defer func() { _ = tx.Rollback(ctx) }()
408✔
409

410
        existing, err := getRecord(ctx, tx, record.GetType(), record.GetId(), lockModeUpdate)
204✔
411
        if isNotFound(err) {
206✔
412
                return storage.ErrNotFound
2✔
413
        } else if err != nil {
204✔
414
                return err
×
415
        }
×
416

417
        if err := storage.PatchRecord(existing, record, fields); err != nil {
202✔
418
                return err
×
419
        }
×
420

421
        if err := putRecordAndChange(ctx, tx, record); err != nil {
202✔
422
                return err
×
423
        }
×
424

425
        return tx.Commit(ctx)
202✔
426
}
427

428
func putService(ctx context.Context, q querier, svc *registry.Service, expiresAt time.Time) error {
4✔
429
        query := `
4✔
430
                INSERT INTO ` + schemaName + `.` + servicesTableName + ` (kind, endpoint, expires_at)
4✔
431
                VALUES ($1, $2, $3)
4✔
432
                ON CONFLICT (kind, endpoint) DO UPDATE
4✔
433
                SET expires_at=$3
4✔
434
        `
4✔
435
        _, err := q.Exec(ctx, query, svc.GetKind().String(), svc.GetEndpoint(), expiresAt)
4✔
436
        return err
4✔
437
}
4✔
438

439
func setCheckpoint(ctx context.Context, q querier, serverVersion, recordVersion uint64) error {
1✔
440
        var sv, rv pgtype.Numeric
1✔
441
        err := sv.Scan(strconv.FormatUint(serverVersion, 10))
1✔
442
        if err != nil {
1✔
443
                return err
×
444
        }
×
445
        err = rv.Scan(strconv.FormatUint(recordVersion, 10))
1✔
446
        if err != nil {
1✔
447
                return err
×
448
        }
×
449
        _, err = q.Exec(ctx, `
1✔
450
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
451
                SET server_version=$1, record_version=$2
1✔
452
        `, sv, rv)
1✔
453
        return err
1✔
454
}
455

456
func setOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
2✔
457
        capacity := pgtype.Int8{}
2✔
458
        if options != nil && options.Capacity != nil {
4✔
459
                capacity.Int64 = int64(options.GetCapacity())
2✔
460
                capacity.Valid = true
2✔
461
        }
2✔
462

463
        _, err := q.Exec(ctx, `
2✔
464
                INSERT INTO `+schemaName+`.`+recordOptionsTableName+` (type, capacity)
2✔
465
                VALUES ($1, $2)
2✔
466
                ON CONFLICT (type) DO UPDATE
2✔
467
                SET capacity=$2
2✔
468
        `, recordType, capacity)
2✔
469
        return err
2✔
470
}
471

472
func signalRecordChange(ctx context.Context, q querier) error {
333✔
473
        _, err := q.Exec(ctx, `NOTIFY `+recordChangeNotifyName)
333✔
474
        return err
333✔
475
}
333✔
476

477
func signalServiceChange(ctx context.Context, q querier) error {
2✔
478
        _, err := q.Exec(ctx, `NOTIFY `+serviceChangeNotifyName)
2✔
479
        return err
2✔
480
}
2✔
481

482
func jsonbFromAny(a *anypb.Any) ([]byte, error) {
1,352✔
483
        if a == nil {
2,363✔
484
                return nil, nil
1,011✔
485
        }
1,011✔
486

487
        return protojson.Marshal(a)
341✔
488
}
489

490
func timestamppbFromTimestamptz(ts pgtype.Timestamptz) *timestamppb.Timestamp {
1,050✔
491
        if !ts.Valid {
1,420✔
492
                return nil
370✔
493
        }
370✔
494
        return timestamppb.New(ts.Time)
680✔
495
}
496

497
func timestamptzFromTimestamppb(ts *timestamppb.Timestamp) pgtype.Timestamptz {
2,732✔
498
        if !ts.IsValid() {
4,083✔
499
                return pgtype.Timestamptz{}
1,351✔
500
        }
1,351✔
501
        return pgtype.Timestamptz{Time: ts.AsTime(), Valid: true}
1,381✔
502
}
503

504
func isNotFound(err error) bool {
2,649✔
505
        return errors.Is(err, pgx.ErrNoRows) || errors.Is(err, storage.ErrNotFound)
2,649✔
506
}
2,649✔
507

508
func isUnknownType(err error) bool {
680✔
509
        if err == nil {
1,357✔
510
                return false
677✔
511
        }
677✔
512

513
        return errors.Is(err, protoregistry.NotFound) ||
3✔
514
                strings.Contains(err.Error(), "unable to resolve") // protojson doesn't wrap errors so check for the string
3✔
515
}
516

517
func collectRecord(row pgx.CollectableRow) (*databroker.Record, error) {
371✔
518
        var recordType, id string
371✔
519
        var version uint64
371✔
520
        var data []byte
371✔
521
        var modifiedAt pgtype.Timestamptz
371✔
522
        var deletedAt pgtype.Timestamptz
371✔
523
        err := row.Scan(&recordType, &id, &version, &data, &modifiedAt, &deletedAt)
371✔
524
        if err != nil {
371✔
525
                return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
526
        }
×
527

528
        a, err := protoutil.UnmarshalAnyJSON(data)
371✔
529
        if isUnknownType(err) || len(data) == 0 {
373✔
530
                a = protoutil.ToAny(protoutil.ToStruct(map[string]string{
2✔
531
                        "id": id,
2✔
532
                }))
2✔
533
        } else if err != nil {
371✔
534
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
535
        }
×
536

537
        return &databroker.Record{
371✔
538
                Version:    version,
371✔
539
                Type:       recordType,
371✔
540
                Id:         id,
371✔
541
                Data:       a,
371✔
542
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
371✔
543
                DeletedAt:  timestamppbFromTimestamptz(deletedAt),
371✔
544
        }, nil
371✔
545
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc