• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 20582412022

29 Dec 2025 08:49PM UTC coverage: 52.923% (-0.02%) from 52.947%
20582412022

push

github

web-flow
dependabot: split go deps into subgroups (#6008)

## Summary

Go dependencies updates sometimes require resolving issues in multiple
groups of packages, it would likely be more convenient to have them
update with more granularity.
i.e. https://github.com/pomerium/pomerium/pull/6005

## Related issues

<!-- For example...
- #159
-->

## User Explanation

<!-- How would you explain this change to the user? If this
change doesn't create any user-facing changes, you can leave
this blank. If filled out, add the `docs` label -->

## Checklist

- [ ] reference any related issues
- [ ] updated unit tests
- [ ] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [ ] ready for review

29541 of 55819 relevant lines covered (52.92%)

126.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.71
/pkg/storage/postgres/postgres.go
1
// Package postgres contains an implementation of the storage.Backend backed by postgres.
2
package postgres
3

4
import (
5
        "context"
6
        "errors"
7
        "fmt"
8
        "sort"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "github.com/jackc/pgx/v5"
14
        "github.com/jackc/pgx/v5/pgconn"
15
        "github.com/jackc/pgx/v5/pgtype"
16
        "github.com/jackc/pgx/v5/pgxpool"
17
        "google.golang.org/grpc/codes"
18
        "google.golang.org/grpc/status"
19
        "google.golang.org/protobuf/encoding/protojson"
20
        "google.golang.org/protobuf/proto"
21
        "google.golang.org/protobuf/reflect/protoregistry"
22
        "google.golang.org/protobuf/types/known/anypb"
23
        "google.golang.org/protobuf/types/known/fieldmaskpb"
24
        "google.golang.org/protobuf/types/known/timestamppb"
25

26
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
27
        "github.com/pomerium/pomerium/pkg/grpc/registry"
28
        "github.com/pomerium/pomerium/pkg/protoutil"
29
        "github.com/pomerium/pomerium/pkg/storage"
30
)
31

32
const (
33
        recordBatchSize   = 64
34
        watchPollInterval = 30 * time.Second
35
)
36

37
var (
38
        schemaName              = "pomerium"
39
        migrationInfoTableName  = "migration_info"
40
        recordsTableName        = "records"
41
        recordChangesTableName  = "record_changes"
42
        recordChangeNotifyName  = "pomerium_record_change"
43
        recordOptionsTableName  = "record_options"
44
        leasesTableName         = "leases"
45
        serviceChangeNotifyName = "pomerium_service_change"
46
        servicesTableName       = "services"
47
        checkpointsTableName    = "checkpoints"
48
)
49

50
type querier interface {
51
        Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
52
        Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
53
        QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
54
}
55

56
func clearRecords(ctx context.Context, q querier, newServerVersion uint64) error {
1✔
57
        _, err := q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordChangesTableName)
1✔
58
        if err != nil {
1✔
59
                return err
×
60
        }
×
61
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordsTableName)
1✔
62
        if err != nil {
1✔
63
                return err
×
64
        }
×
65
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordOptionsTableName)
1✔
66
        if err != nil {
1✔
67
                return err
×
68
        }
×
69
        _, err = q.Exec(ctx, `
1✔
70
                UPDATE `+schemaName+`.`+migrationInfoTableName+`
1✔
71
                SET server_version = $1
1✔
72
        `, newServerVersion)
1✔
73
        if err != nil {
1✔
74
                return err
×
75
        }
×
76

77
        _, err = q.Exec(ctx, `
1✔
78
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
79
                SET server_version = 0, record_version = 0
1✔
80
        `)
1✔
81
        if err != nil {
1✔
82
                return err
×
83
        }
×
84

85
        return nil
1✔
86
}
87

88
func deleteChangesBefore(ctx context.Context, q querier, cutoff time.Time) error {
1✔
89
        // we always want to keep at least one row in the changes table,
1✔
90
        // so we take the changes table, sort by version descending, skip the first row
1✔
91
        // and then find any changes before the cutoff
1✔
92
        _, err := q.Exec(ctx, `
1✔
93
                WITH t1 AS (
1✔
94
                        SELECT *
1✔
95
                        FROM `+schemaName+`.`+recordChangesTableName+`
1✔
96
                        ORDER BY version DESC
1✔
97
                        OFFSET 1
1✔
98
                        FOR UPDATE SKIP LOCKED
1✔
99
                ), t2 AS (
1✔
100
                        SELECT version
1✔
101
                        FROM t1
1✔
102
                        WHERE modified_at<$1
1✔
103
                        FOR UPDATE SKIP LOCKED
1✔
104
                )
1✔
105
                DELETE FROM `+schemaName+`.`+recordChangesTableName+` t3
1✔
106
                USING t2
1✔
107
                WHERE t2.version=t3.version
1✔
108
        `, cutoff)
1✔
109
        return err
1✔
110
}
1✔
111

112
func deleteExpiredServices(ctx context.Context, q querier, cutoff time.Time) (rowCount int64, err error) {
16✔
113
        cmd, err := q.Exec(ctx, `
16✔
114
                WITH t1 AS (
16✔
115
                        SELECT kind, endpoint
16✔
116
                        FROM `+schemaName+`.`+servicesTableName+`
16✔
117
                        WHERE expires_at<$1
16✔
118
                        FOR UPDATE SKIP LOCKED
16✔
119
                )
16✔
120
                DELETE FROM `+schemaName+`.`+servicesTableName+` t2
16✔
121
                USING t1
16✔
122
                WHERE t1.kind=t2.kind
16✔
123
                  AND t1.endpoint=t2.endpoint
16✔
124
        `, cutoff)
16✔
125
        if err != nil {
16✔
126
                return 0, err
×
127
        }
×
128
        return cmd.RowsAffected(), nil
16✔
129
}
130

131
func enforceOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
139✔
132
        if options == nil || options.Capacity == nil {
267✔
133
                return nil
128✔
134
        }
128✔
135

136
        _, err := q.Exec(ctx, `
11✔
137
                DELETE FROM `+schemaName+`.`+recordsTableName+`
11✔
138
                WHERE type=$1
11✔
139
                  AND id NOT IN (
11✔
140
                        SELECT id
11✔
141
                        FROM `+schemaName+`.`+recordsTableName+`
11✔
142
                        WHERE type=$1
11✔
143
                        ORDER BY version DESC
11✔
144
                        LIMIT $2
11✔
145
                )
11✔
146
        `, recordType, options.GetCapacity())
11✔
147
        return err
11✔
148
}
149

150
func getRecordVersionRange(ctx context.Context, q querier) (earliestRecordVersion, latestRecordVersion uint64, err error) {
61✔
151
        err = q.QueryRow(ctx, `
61✔
152
                SELECT COALESCE(MIN(version), 0), COALESCE(MAX(version), 0)
61✔
153
                FROM `+schemaName+`.`+recordChangesTableName+`
61✔
154
        `).Scan(&earliestRecordVersion, &latestRecordVersion)
61✔
155
        return earliestRecordVersion, latestRecordVersion, err
61✔
156
}
61✔
157

158
func getCheckpoint(ctx context.Context, q querier) (serverVersion, recordVersion uint64, err error) {
1✔
159
        var sv, rv pgtype.Numeric
1✔
160
        err = q.QueryRow(ctx, `
1✔
161
                SELECT server_version, record_version
1✔
162
                FROM `+schemaName+`.`+checkpointsTableName+`
1✔
163
        `).Scan(&sv, &rv)
1✔
164
        return sv.Int.Uint64(), rv.Int.Uint64(), err
1✔
165
}
1✔
166

167
func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) {
169✔
168
        var capacity pgtype.Int8
169✔
169
        var fields pgtype.Array[string]
169✔
170
        err := q.QueryRow(ctx, `
169✔
171
                SELECT capacity, fields
169✔
172
                FROM `+schemaName+`.`+recordOptionsTableName+`
169✔
173
                WHERE type=$1
169✔
174
        `, recordType).Scan(&capacity, &fields)
169✔
175
        if isNotFound(err) {
314✔
176
                return nil, status.Error(codes.NotFound, err.Error())
145✔
177
        }
145✔
178
        if err != nil {
24✔
179
                return nil, err
×
180
        }
×
181
        options := new(databroker.Options)
24✔
182
        if capacity.Valid {
41✔
183
                options.Capacity = proto.Uint64(uint64(capacity.Int64))
17✔
184
        }
17✔
185
        if fields.Valid {
35✔
186
                options.IndexableFields = fields.Elements
11✔
187
        }
11✔
188
        return options, nil
24✔
189
}
190

191
type lockMode string
192

193
const (
194
        lockModeNone   lockMode = ""
195
        lockModeUpdate lockMode = "FOR UPDATE"
196
)
197

198
func getRecord(
199
        ctx context.Context, q querier, recordType, recordID string, lockMode lockMode,
200
) (*databroker.Record, error) {
1,313✔
201
        var version uint64
1,313✔
202
        var data []byte
1,313✔
203
        var modifiedAt pgtype.Timestamptz
1,313✔
204
        err := q.QueryRow(ctx, `
1,313✔
205
                SELECT version, data, modified_at
1,313✔
206
                  FROM `+schemaName+`.`+recordsTableName+`
1,313✔
207
                 WHERE type=$1 AND id=$2 `+string(lockMode),
1,313✔
208
                recordType, recordID).Scan(&version, &data, &modifiedAt)
1,313✔
209
        if isNotFound(err) {
2,317✔
210
                return nil, storage.ErrNotFound
1,004✔
211
        } else if err != nil {
1,313✔
212
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
213
        }
×
214

215
        a, err := protoutil.UnmarshalAnyJSON(data)
309✔
216
        if isUnknownType(err) {
310✔
217
                return nil, storage.ErrNotFound
1✔
218
        } else if err != nil {
309✔
219
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
220
        }
×
221

222
        return &databroker.Record{
308✔
223
                Version:    version,
308✔
224
                Type:       recordType,
308✔
225
                Id:         recordID,
308✔
226
                Data:       a,
308✔
227
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
308✔
228
        }, nil
308✔
229
}
230

231
func listChangedRecordsAfter(ctx context.Context, q querier, recordType string, lastRecordVersion uint64) ([]*databroker.Record, error) {
62✔
232
        args := []any{lastRecordVersion, recordBatchSize}
62✔
233
        query := `
62✔
234
                SELECT type, id, version, data, modified_at, deleted_at
62✔
235
                FROM ` + schemaName + `.` + recordChangesTableName + `
62✔
236
                WHERE version>$1
62✔
237
        `
62✔
238
        if recordType != "" {
102✔
239
                args = append(args, recordType)
40✔
240
                query += ` AND type=$3`
40✔
241
        }
40✔
242
        query += `
62✔
243
                ORDER BY version
62✔
244
                LIMIT $2
62✔
245
        `
62✔
246
        rows, err := q.Query(ctx, query, args...)
62✔
247
        if err != nil {
62✔
248
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
249
        }
×
250
        return pgx.CollectRows(rows, collectRecord)
62✔
251
}
252

253
func listLatestRecordsAfter(ctx context.Context, q querier, expr storage.FilterExpression, lastRecordType, lastRecordID string) ([]*databroker.Record, error) {
68✔
254
        args := []any{lastRecordType, lastRecordID, recordBatchSize}
68✔
255
        query := `
68✔
256
                SELECT type, id, version, data, modified_at, NULL::timestamptz
68✔
257
                FROM ` + schemaName + `.` + recordsTableName + `
68✔
258
                WHERE ((type>$1) OR (type=$1 AND id>$2))
68✔
259
        `
68✔
260
        if expr != nil {
124✔
261
                query += "AND "
56✔
262
                err := addFilterExpressionToQuery(&query, &args, expr)
56✔
263
                if err != nil {
56✔
264
                        return nil, fmt.Errorf("postgres: failed to add filter to query: %w", err)
×
265
                }
×
266
        }
267
        query += `
68✔
268
                ORDER BY type, id
68✔
269
                LIMIT $3
68✔
270
        `
68✔
271
        rows, err := q.Query(ctx, query, args...)
68✔
272
        if err != nil {
68✔
273
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
274
        }
×
275
        return pgx.CollectRows(rows, collectRecord)
68✔
276
}
277

278
func listServices(ctx context.Context, q querier) ([]*registry.Service, error) {
2✔
279
        var services []*registry.Service
2✔
280

2✔
281
        query := `
2✔
282
                SELECT kind, endpoint
2✔
283
                FROM  ` + schemaName + `.` + servicesTableName + `
2✔
284
                ORDER BY kind, endpoint
2✔
285
        `
2✔
286
        rows, err := q.Query(ctx, query)
2✔
287
        if err != nil {
2✔
288
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
289
        }
×
290
        defer rows.Close()
2✔
291

2✔
292
        for rows.Next() {
5✔
293
                var kind, endpoint string
3✔
294
                err = rows.Scan(&kind, &endpoint)
3✔
295
                if err != nil {
3✔
296
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
297
                }
×
298

299
                services = append(services, &registry.Service{
3✔
300
                        Kind:     registry.ServiceKind(registry.ServiceKind_value[kind]),
3✔
301
                        Endpoint: endpoint,
3✔
302
                })
3✔
303
        }
304
        err = rows.Err()
2✔
305
        if err != nil {
2✔
306
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
307
        }
×
308

309
        return services, nil
2✔
310
}
311

312
func listTypes(ctx context.Context, q querier) ([]string, error) {
11✔
313
        query := `
11✔
314
                SELECT DISTINCT type
11✔
315
                FROM ` + schemaName + `.` + recordsTableName + `
11✔
316
        `
11✔
317
        rows, err := q.Query(ctx, query)
11✔
318
        if err != nil {
21✔
319
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
10✔
320
        }
10✔
321
        defer rows.Close()
1✔
322

1✔
323
        var types []string
1✔
324
        for rows.Next() {
5✔
325
                var recordType string
4✔
326
                err = rows.Scan(&recordType)
4✔
327
                if err != nil {
4✔
328
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
329
                }
×
330

331
                types = append(types, recordType)
4✔
332
        }
333
        err = rows.Err()
1✔
334
        if err != nil {
1✔
335
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
336
        }
×
337

338
        sort.Strings(types)
1✔
339
        return types, nil
1✔
340
}
341

342
func maybeAcquireLease(ctx context.Context, q querier, leaseName, leaseID string, ttl time.Duration) (leaseHolderID string, err error) {
14✔
343
        tbl := schemaName + "." + leasesTableName
14✔
344
        expiresAt := timestamptzFromTimestamppb(timestamppb.New(time.Now().Add(ttl)))
14✔
345
        now := timestamptzFromTimestamppb(timestamppb.Now())
14✔
346
        err = q.QueryRow(ctx, `
14✔
347
                INSERT INTO `+tbl+` (name, id, expires_at)
14✔
348
                VALUES ($1, $2, $3)
14✔
349
                ON CONFLICT (name) DO UPDATE
14✔
350
                SET id=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $2 ELSE `+tbl+`.id END,
14✔
351
                    expires_at=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $3 ELSE `+tbl+`.expires_at END
14✔
352
                RETURNING `+tbl+`.id
14✔
353
        `, leaseName, leaseID, expiresAt, now).Scan(&leaseHolderID)
14✔
354
        return leaseHolderID, err
14✔
355
}
14✔
356

357
func putRecordAndChange(ctx context.Context, q querier, record *databroker.Record) error {
1,380✔
358
        data, err := jsonbFromAny(record.GetData())
1,380✔
359
        if err != nil {
1,380✔
360
                return fmt.Errorf("postgres: failed to convert any to json: %w", err)
×
361
        }
×
362

363
        modifiedAt := timestamptzFromTimestamppb(record.GetModifiedAt())
1,380✔
364
        deletedAt := timestamptzFromTimestamppb(record.GetDeletedAt())
1,380✔
365
        indexCIDR := &pgtype.Text{Valid: false}
1,380✔
366
        if cidr := storage.GetRecordIndexCIDR(record.GetData()); cidr != nil {
1,384✔
367
                indexCIDR.String = cidr.String()
4✔
368
                indexCIDR.Valid = true
4✔
369
        }
4✔
370

371
        query := `
1,380✔
372
                WITH t1 AS (
1,380✔
373
                        INSERT INTO ` + schemaName + `.` + recordChangesTableName + ` (type, id, data, modified_at, deleted_at)
1,380✔
374
                        VALUES ($1, $2, $3, $4, $5)
1,380✔
375
                        RETURNING *
1,380✔
376
                )
1,380✔
377
        `
1,380✔
378
        args := []any{
1,380✔
379
                record.GetType(), record.GetId(), data, modifiedAt, deletedAt,
1,380✔
380
        }
1,380✔
381
        if record.GetDeletedAt() == nil {
2,758✔
382
                query += `
1,378✔
383
                        INSERT INTO ` + schemaName + `.` + recordsTableName + ` (type, id, version, data, modified_at, index_cidr)
1,378✔
384
                        VALUES ($1, $2, (SELECT version FROM t1), $3, $4, $6)
1,378✔
385
                        ON CONFLICT (type, id) DO UPDATE
1,378✔
386
                        SET version=(SELECT version FROM t1), data=$3, modified_at=$4, index_cidr=$6
1,378✔
387
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
1,378✔
388
                `
1,378✔
389
                args = append(args, indexCIDR)
1,378✔
390
        } else {
1,380✔
391
                query += `
2✔
392
                        DELETE FROM ` + schemaName + `.` + recordsTableName + `
2✔
393
                        WHERE type=$1 AND id=$2
2✔
394
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
2✔
395
                `
2✔
396
        }
2✔
397
        err = q.QueryRow(ctx, query, args...).Scan(&record.Version)
1,380✔
398
        if err != nil && !isNotFound(err) {
2,390✔
399
                return fmt.Errorf("postgres: failed to execute query: %w", err)
1,010✔
400
        }
1,010✔
401

402
        return nil
370✔
403
}
404

405
// patchRecord updates specific fields of an existing record.
406
func patchRecord(
407
        ctx context.Context, p *pgxpool.Pool, record *databroker.Record, fields *fieldmaskpb.FieldMask,
408
) error {
204✔
409
        tx, err := p.Begin(ctx)
204✔
410
        if err != nil {
204✔
411
                return err
×
412
        }
×
413
        defer func() { _ = tx.Rollback(ctx) }()
408✔
414

415
        existing, err := getRecord(ctx, tx, record.GetType(), record.GetId(), lockModeUpdate)
204✔
416
        if isNotFound(err) {
206✔
417
                return storage.ErrNotFound
2✔
418
        } else if err != nil {
204✔
419
                return err
×
420
        }
×
421

422
        if err := storage.PatchRecord(existing, record, fields); err != nil {
202✔
423
                return err
×
424
        }
×
425

426
        if err := putRecordAndChange(ctx, tx, record); err != nil {
202✔
427
                return err
×
428
        }
×
429

430
        return tx.Commit(ctx)
202✔
431
}
432

433
func putService(ctx context.Context, q querier, svc *registry.Service, expiresAt time.Time) error {
4✔
434
        query := `
4✔
435
                INSERT INTO ` + schemaName + `.` + servicesTableName + ` (kind, endpoint, expires_at)
4✔
436
                VALUES ($1, $2, $3)
4✔
437
                ON CONFLICT (kind, endpoint) DO UPDATE
4✔
438
                SET expires_at=$3
4✔
439
        `
4✔
440
        _, err := q.Exec(ctx, query, svc.GetKind().String(), svc.GetEndpoint(), expiresAt)
4✔
441
        return err
4✔
442
}
4✔
443

444
func setCheckpoint(ctx context.Context, q querier, serverVersion, recordVersion uint64) error {
1✔
445
        var sv, rv pgtype.Numeric
1✔
446
        err := sv.Scan(strconv.FormatUint(serverVersion, 10))
1✔
447
        if err != nil {
1✔
448
                return err
×
449
        }
×
450
        err = rv.Scan(strconv.FormatUint(recordVersion, 10))
1✔
451
        if err != nil {
1✔
452
                return err
×
453
        }
×
454
        _, err = q.Exec(ctx, `
1✔
455
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
456
                SET server_version=$1, record_version=$2
1✔
457
        `, sv, rv)
1✔
458
        return err
1✔
459
}
460

461
func setOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
17✔
462
        if proto.Equal(options, new(databroker.Options)) {
18✔
463
                _, err := q.Exec(ctx, `
1✔
464
                        DELETE FROM `+schemaName+`.`+recordOptionsTableName+`
1✔
465
                        WHERE type=$1
1✔
466
                `, recordType)
1✔
467
                return err
1✔
468
        }
1✔
469

470
        capacity := pgtype.Int8{}
16✔
471
        if options != nil && options.Capacity != nil {
23✔
472
                capacity.Int64 = int64(options.GetCapacity())
7✔
473
                capacity.Valid = true
7✔
474
        }
7✔
475

476
        _, err := q.Exec(ctx, `
16✔
477
                INSERT INTO `+schemaName+`.`+recordOptionsTableName+` (type, capacity, fields)
16✔
478
                VALUES ($1, $2, $3)
16✔
479
                ON CONFLICT (type) DO UPDATE
16✔
480
                SET capacity=$2, fields=$3
16✔
481
        `, recordType, capacity, options.GetIndexableFields())
16✔
482
        return err
16✔
483
}
484

485
func signalRecordChange(ctx context.Context, q querier) error {
339✔
486
        _, err := q.Exec(ctx, `NOTIFY `+recordChangeNotifyName)
339✔
487
        return err
339✔
488
}
339✔
489

490
func signalServiceChange(ctx context.Context, q querier) error {
2✔
491
        _, err := q.Exec(ctx, `NOTIFY `+serviceChangeNotifyName)
2✔
492
        return err
2✔
493
}
2✔
494

495
func jsonbFromAny(a *anypb.Any) ([]byte, error) {
1,380✔
496
        if a == nil {
2,391✔
497
                return nil, nil
1,011✔
498
        }
1,011✔
499

500
        return protojson.Marshal(a)
369✔
501
}
502

503
func timestamppbFromTimestamptz(ts pgtype.Timestamptz) *timestamppb.Timestamp {
1,150✔
504
        if !ts.Valid {
1,570✔
505
                return nil
420✔
506
        }
420✔
507
        return timestamppb.New(ts.Time)
730✔
508
}
509

510
func timestamptzFromTimestamppb(ts *timestamppb.Timestamp) pgtype.Timestamptz {
2,788✔
511
        if !ts.IsValid() {
4,166✔
512
                return pgtype.Timestamptz{}
1,378✔
513
        }
1,378✔
514
        return pgtype.Timestamptz{Time: ts.AsTime(), Valid: true}
1,410✔
515
}
516

517
func isNotFound(err error) bool {
2,697✔
518
        return errors.Is(err, pgx.ErrNoRows) || errors.Is(err, storage.ErrNotFound)
2,697✔
519
}
2,697✔
520

521
func isUnknownType(err error) bool {
730✔
522
        if err == nil {
1,457✔
523
                return false
727✔
524
        }
727✔
525

526
        return errors.Is(err, protoregistry.NotFound) ||
3✔
527
                strings.Contains(err.Error(), "unable to resolve") // protojson doesn't wrap errors so check for the string
3✔
528
}
529

530
func collectRecord(row pgx.CollectableRow) (*databroker.Record, error) {
421✔
531
        var recordType, id string
421✔
532
        var version uint64
421✔
533
        var data []byte
421✔
534
        var modifiedAt pgtype.Timestamptz
421✔
535
        var deletedAt pgtype.Timestamptz
421✔
536
        err := row.Scan(&recordType, &id, &version, &data, &modifiedAt, &deletedAt)
421✔
537
        if err != nil {
421✔
538
                return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
539
        }
×
540

541
        a, err := protoutil.UnmarshalAnyJSON(data)
421✔
542
        if isUnknownType(err) || len(data) == 0 {
423✔
543
                a = protoutil.ToAny(protoutil.ToStruct(map[string]string{
2✔
544
                        "id": id,
2✔
545
                }))
2✔
546
        } else if err != nil {
421✔
547
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
548
        }
×
549

550
        return &databroker.Record{
421✔
551
                Version:    version,
421✔
552
                Type:       recordType,
421✔
553
                Id:         id,
421✔
554
                Data:       a,
421✔
555
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
421✔
556
                DeletedAt:  timestamppbFromTimestamptz(deletedAt),
421✔
557
        }, nil
421✔
558
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc