• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 22647563461

03 Mar 2026 11:28PM UTC coverage: 44.809% (-0.02%) from 44.825%
22647563461

push

github

web-flow
ci: update dependencies (#6168)

This PR updates dependencies not managed by dependabot.

Co-authored-by: GitHub Actions <apparitor@users.noreply.github.com>

33759 of 75339 relevant lines covered (44.81%)

115.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.08
/pkg/storage/postgres/postgres.go
1
// Package postgres contains an implementation of the storage.Backend backed by postgres.
2
package postgres
3

4
import (
5
        "context"
6
        "errors"
7
        "fmt"
8
        "sort"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "github.com/jackc/pgx/v5"
14
        "github.com/jackc/pgx/v5/pgconn"
15
        "github.com/jackc/pgx/v5/pgtype"
16
        "github.com/jackc/pgx/v5/pgxpool"
17
        "google.golang.org/grpc/codes"
18
        "google.golang.org/grpc/status"
19
        "google.golang.org/protobuf/encoding/protojson"
20
        "google.golang.org/protobuf/proto"
21
        "google.golang.org/protobuf/reflect/protoregistry"
22
        "google.golang.org/protobuf/types/known/anypb"
23
        "google.golang.org/protobuf/types/known/durationpb"
24
        "google.golang.org/protobuf/types/known/fieldmaskpb"
25
        "google.golang.org/protobuf/types/known/timestamppb"
26

27
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
28
        "github.com/pomerium/pomerium/pkg/grpc/registry"
29
        "github.com/pomerium/pomerium/pkg/protoutil"
30
        "github.com/pomerium/pomerium/pkg/storage"
31
)
32

33
const (
34
        recordBatchSize   = 64
35
        watchPollInterval = 30 * time.Second
36
)
37

38
var (
39
        schemaName              = "pomerium"
40
        migrationInfoTableName  = "migration_info"
41
        recordsTableName        = "records"
42
        recordChangesTableName  = "record_changes"
43
        recordChangeNotifyName  = "pomerium_record_change"
44
        recordOptionsTableName  = "record_options"
45
        leasesTableName         = "leases"
46
        serviceChangeNotifyName = "pomerium_service_change"
47
        servicesTableName       = "services"
48
        checkpointsTableName    = "checkpoints"
49
)
50

51
type querier interface {
52
        Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
53
        Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
54
        QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
55
}
56

57
func clearRecords(ctx context.Context, q querier, newServerVersion uint64) error {
1✔
58
        _, err := q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordChangesTableName)
1✔
59
        if err != nil {
1✔
60
                return err
×
61
        }
×
62
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordsTableName)
1✔
63
        if err != nil {
1✔
64
                return err
×
65
        }
×
66
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordOptionsTableName)
1✔
67
        if err != nil {
1✔
68
                return err
×
69
        }
×
70
        _, err = q.Exec(ctx, `
1✔
71
                UPDATE `+schemaName+`.`+migrationInfoTableName+`
1✔
72
                SET server_version = $1
1✔
73
        `, newServerVersion)
1✔
74
        if err != nil {
1✔
75
                return err
×
76
        }
×
77

78
        _, err = q.Exec(ctx, `
1✔
79
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
80
                SET server_version = 0, record_version = 0
1✔
81
        `)
1✔
82
        if err != nil {
1✔
83
                return err
×
84
        }
×
85

86
        return nil
1✔
87
}
88

89
func deleteChangesBefore(ctx context.Context, q querier, cutoff time.Time) error {
6✔
90
        // we always want to keep at least one row in the changes table,
6✔
91
        // so we take the changes table, sort by version descending, skip the first row
6✔
92
        // and then find any changes before the cutoff
6✔
93
        _, err := q.Exec(ctx, `
6✔
94
                WITH t1 AS (
6✔
95
                        SELECT *
6✔
96
                        FROM `+schemaName+`.`+recordChangesTableName+`
6✔
97
                        ORDER BY version DESC
6✔
98
                        OFFSET 1
6✔
99
                        FOR UPDATE SKIP LOCKED
6✔
100
                ), t2 AS (
6✔
101
                        SELECT version
6✔
102
                        FROM t1
6✔
103
                        WHERE modified_at<$1
6✔
104
                        FOR UPDATE SKIP LOCKED
6✔
105
                )
6✔
106
                DELETE FROM `+schemaName+`.`+recordChangesTableName+` t3
6✔
107
                USING t2
6✔
108
                WHERE t2.version=t3.version
6✔
109
        `, cutoff)
6✔
110
        return err
6✔
111
}
6✔
112

113
func deleteExpiredServices(ctx context.Context, q querier, cutoff time.Time) (rowCount int64, err error) {
16✔
114
        cmd, err := q.Exec(ctx, `
16✔
115
                WITH t1 AS (
16✔
116
                        SELECT kind, endpoint
16✔
117
                        FROM `+schemaName+`.`+servicesTableName+`
16✔
118
                        WHERE expires_at<$1
16✔
119
                        FOR UPDATE SKIP LOCKED
16✔
120
                )
16✔
121
                DELETE FROM `+schemaName+`.`+servicesTableName+` t2
16✔
122
                USING t1
16✔
123
                WHERE t1.kind=t2.kind
16✔
124
                  AND t1.endpoint=t2.endpoint
16✔
125
        `, cutoff)
16✔
126
        if err != nil {
16✔
127
                return 0, err
×
128
        }
×
129
        return cmd.RowsAffected(), nil
16✔
130
}
131

132
func deleteExpiredRecords(ctx context.Context, q querier, recordType string, cutoff time.Time) (int64, error) {
4✔
133
        cmd, err := q.Exec(ctx, `
4✔
134
                WITH expired AS (
4✔
135
                        SELECT type, id
4✔
136
                        FROM `+schemaName+`.`+recordsTableName+`
4✔
137
                        WHERE type = $1 AND modified_at < $2
4✔
138
                        FOR UPDATE SKIP LOCKED
4✔
139
                ),
4✔
140
                tombstones AS (
4✔
141
                        INSERT INTO `+schemaName+`.`+recordChangesTableName+` (type, id, data, modified_at, deleted_at)
4✔
142
                        SELECT type, id, NULL, NOW(), NOW()
4✔
143
                        FROM expired
4✔
144
                )
4✔
145
                DELETE FROM `+schemaName+`.`+recordsTableName+` t
4✔
146
                USING expired
4✔
147
                WHERE t.type = expired.type AND t.id = expired.id
4✔
148
        `, recordType, cutoff)
4✔
149
        if err != nil {
4✔
150
                return 0, err
×
151
        }
×
152
        return cmd.RowsAffected(), nil
4✔
153
}
154

155
func enforceOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
154✔
156
        if options == nil || options.Capacity == nil {
287✔
157
                return nil
133✔
158
        }
133✔
159

160
        _, err := q.Exec(ctx, `
21✔
161
                DELETE FROM `+schemaName+`.`+recordsTableName+`
21✔
162
                WHERE type=$1
21✔
163
                  AND id NOT IN (
21✔
164
                        SELECT id
21✔
165
                        FROM `+schemaName+`.`+recordsTableName+`
21✔
166
                        WHERE type=$1
21✔
167
                        ORDER BY version DESC
21✔
168
                        LIMIT $2
21✔
169
                )
21✔
170
        `, recordType, options.GetCapacity())
21✔
171
        return err
21✔
172
}
173

174
func getRecordVersionRange(ctx context.Context, q querier) (earliestRecordVersion, latestRecordVersion uint64, err error) {
75✔
175
        err = q.QueryRow(ctx, `
75✔
176
                SELECT COALESCE(MIN(version), 0), COALESCE(MAX(version), 0)
75✔
177
                FROM `+schemaName+`.`+recordChangesTableName+`
75✔
178
        `).Scan(&earliestRecordVersion, &latestRecordVersion)
75✔
179
        return earliestRecordVersion, latestRecordVersion, err
75✔
180
}
75✔
181

182
func getCheckpoint(ctx context.Context, q querier) (serverVersion, recordVersion uint64, err error) {
1✔
183
        var sv, rv pgtype.Numeric
1✔
184
        err = q.QueryRow(ctx, `
1✔
185
                SELECT server_version, record_version
1✔
186
                FROM `+schemaName+`.`+checkpointsTableName+`
1✔
187
        `).Scan(&sv, &rv)
1✔
188
        return sv.Int.Uint64(), rv.Int.Uint64(), err
1✔
189
}
1✔
190

191
func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) {
198✔
192
        var capacity pgtype.Int8
198✔
193
        var fields pgtype.Array[string]
198✔
194
        var ttlNanos pgtype.Int8
198✔
195
        err := q.QueryRow(ctx, `
198✔
196
                SELECT capacity, fields, ttl
198✔
197
                FROM `+schemaName+`.`+recordOptionsTableName+`
198✔
198
                WHERE type=$1
198✔
199
        `, recordType).Scan(&capacity, &fields, &ttlNanos)
198✔
200
        if isNotFound(err) {
354✔
201
                return nil, status.Error(codes.NotFound, err.Error())
156✔
202
        }
156✔
203
        if err != nil {
42✔
204
                return nil, err
×
205
        }
×
206
        options := new(databroker.Options)
42✔
207
        if capacity.Valid {
71✔
208
                options.Capacity = proto.Uint64(uint64(capacity.Int64))
29✔
209
        }
29✔
210
        if fields.Valid {
54✔
211
                options.IndexableFields = fields.Elements
12✔
212
        }
12✔
213
        if ttlNanos.Valid {
62✔
214
                options.Ttl = durationpb.New(time.Duration(ttlNanos.Int64))
20✔
215
        }
20✔
216
        return options, nil
42✔
217
}
218

219
type lockMode string
220

221
const (
222
        lockModeNone   lockMode = ""
223
        lockModeUpdate lockMode = "FOR UPDATE"
224
)
225

226
func getRecord(
227
        ctx context.Context, q querier, recordType, recordID string, lockMode lockMode,
228
) (*databroker.Record, error) {
1,315✔
229
        var version uint64
1,315✔
230
        var data []byte
1,315✔
231
        var modifiedAt pgtype.Timestamptz
1,315✔
232
        err := q.QueryRow(ctx, `
1,315✔
233
                SELECT version, data, modified_at
1,315✔
234
                  FROM `+schemaName+`.`+recordsTableName+`
1,315✔
235
                 WHERE type=$1 AND id=$2 `+string(lockMode),
1,315✔
236
                recordType, recordID).Scan(&version, &data, &modifiedAt)
1,315✔
237
        if isNotFound(err) {
2,319✔
238
                return nil, storage.ErrNotFound
1,004✔
239
        } else if err != nil {
1,315✔
240
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
241
        }
×
242

243
        a, err := protoutil.UnmarshalAnyJSON(data)
311✔
244
        if isUnknownType(err) {
312✔
245
                return nil, storage.ErrNotFound
1✔
246
        } else if err != nil {
311✔
247
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
248
        }
×
249

250
        return &databroker.Record{
310✔
251
                Version:    version,
310✔
252
                Type:       recordType,
310✔
253
                Id:         recordID,
310✔
254
                Data:       a,
310✔
255
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
310✔
256
        }, nil
310✔
257
}
258

259
func listChangedRecordsAfter(ctx context.Context, q querier, recordType string, lastRecordVersion uint64) ([]*databroker.Record, error) {
79✔
260
        args := []any{lastRecordVersion, recordBatchSize}
79✔
261
        query := `
79✔
262
                SELECT type, id, version, data, modified_at, deleted_at
79✔
263
                FROM ` + schemaName + `.` + recordChangesTableName + `
79✔
264
                WHERE version>$1
79✔
265
        `
79✔
266
        if recordType != "" {
135✔
267
                args = append(args, recordType)
56✔
268
                query += ` AND type=$3`
56✔
269
        }
56✔
270
        query += `
79✔
271
                ORDER BY version
79✔
272
                LIMIT $2
79✔
273
        `
79✔
274
        rows, err := q.Query(ctx, query, args...)
79✔
275
        if err != nil {
79✔
276
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
277
        }
×
278
        return pgx.CollectRows(rows, collectRecord)
79✔
279
}
280

281
func listLatestRecordsAfter(ctx context.Context, q querier, expr storage.FilterExpression, lastRecordType, lastRecordID string) ([]*databroker.Record, error) {
78✔
282
        args := []any{lastRecordType, lastRecordID, recordBatchSize}
78✔
283
        query := `
78✔
284
                SELECT type, id, version, data, modified_at, NULL::timestamptz
78✔
285
                FROM ` + schemaName + `.` + recordsTableName + `
78✔
286
                WHERE ((type>$1) OR (type=$1 AND id>$2))
78✔
287
        `
78✔
288
        if expr != nil {
144✔
289
                query += "AND "
66✔
290
                err := addFilterExpressionToQuery(&query, &args, expr)
66✔
291
                if err != nil {
66✔
292
                        return nil, fmt.Errorf("postgres: failed to add filter to query: %w", err)
×
293
                }
×
294
        }
295
        query += `
78✔
296
                ORDER BY type, id
78✔
297
                LIMIT $3
78✔
298
        `
78✔
299
        rows, err := q.Query(ctx, query, args...)
78✔
300
        if err != nil {
78✔
301
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
302
        }
×
303
        return pgx.CollectRows(rows, collectRecord)
78✔
304
}
305

306
func listServices(ctx context.Context, q querier) ([]*registry.Service, error) {
2✔
307
        var services []*registry.Service
2✔
308

2✔
309
        query := `
2✔
310
                SELECT kind, endpoint
2✔
311
                FROM  ` + schemaName + `.` + servicesTableName + `
2✔
312
                ORDER BY kind, endpoint
2✔
313
        `
2✔
314
        rows, err := q.Query(ctx, query)
2✔
315
        if err != nil {
2✔
316
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
317
        }
×
318
        defer rows.Close()
2✔
319

2✔
320
        for rows.Next() {
5✔
321
                var kind, endpoint string
3✔
322
                err = rows.Scan(&kind, &endpoint)
3✔
323
                if err != nil {
3✔
324
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
325
                }
×
326

327
                services = append(services, &registry.Service{
3✔
328
                        Kind:     registry.ServiceKind(registry.ServiceKind_value[kind]),
3✔
329
                        Endpoint: endpoint,
3✔
330
                })
3✔
331
        }
332
        err = rows.Err()
2✔
333
        if err != nil {
2✔
334
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
335
        }
×
336

337
        return services, nil
2✔
338
}
339

340
func listTypes(ctx context.Context, q querier) ([]string, error) {
11✔
341
        query := `
11✔
342
                SELECT DISTINCT type
11✔
343
                FROM ` + schemaName + `.` + recordsTableName + `
11✔
344
        `
11✔
345
        rows, err := q.Query(ctx, query)
11✔
346
        if err != nil {
21✔
347
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
10✔
348
        }
10✔
349
        defer rows.Close()
1✔
350

1✔
351
        var types []string
1✔
352
        for rows.Next() {
5✔
353
                var recordType string
4✔
354
                err = rows.Scan(&recordType)
4✔
355
                if err != nil {
4✔
356
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
357
                }
×
358

359
                types = append(types, recordType)
4✔
360
        }
361
        err = rows.Err()
1✔
362
        if err != nil {
1✔
363
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
364
        }
×
365

366
        sort.Strings(types)
1✔
367
        return types, nil
1✔
368
}
369

370
func maybeAcquireLease(ctx context.Context, q querier, leaseName, leaseID string, ttl time.Duration) (leaseHolderID string, err error) {
15✔
371
        tbl := schemaName + "." + leasesTableName
15✔
372
        expiresAt := timestamptzFromTimestamppb(timestamppb.New(time.Now().Add(ttl)))
15✔
373
        now := timestamptzFromTimestamppb(timestamppb.Now())
15✔
374
        err = q.QueryRow(ctx, `
15✔
375
                INSERT INTO `+tbl+` (name, id, expires_at)
15✔
376
                VALUES ($1, $2, $3)
15✔
377
                ON CONFLICT (name) DO UPDATE
15✔
378
                SET id=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $2 ELSE `+tbl+`.id END,
15✔
379
                    expires_at=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $3 ELSE `+tbl+`.expires_at END
15✔
380
                RETURNING `+tbl+`.id
15✔
381
        `, leaseName, leaseID, expiresAt, now).Scan(&leaseHolderID)
15✔
382
        return leaseHolderID, err
15✔
383
}
15✔
384

385
func putRecordAndChange(ctx context.Context, q querier, record *databroker.Record) error {
1,398✔
386
        data, err := jsonbFromAny(record.GetData())
1,398✔
387
        if err != nil {
1,398✔
388
                return fmt.Errorf("postgres: failed to convert any to json: %w", err)
×
389
        }
×
390

391
        modifiedAt := timestamptzFromTimestamppb(record.GetModifiedAt())
1,398✔
392
        deletedAt := timestamptzFromTimestamppb(record.GetDeletedAt())
1,398✔
393
        indexCIDR := &pgtype.Text{Valid: false}
1,398✔
394
        if cidr := storage.GetRecordIndexCIDR(record.GetData()); cidr != nil {
1,402✔
395
                indexCIDR.String = cidr.String()
4✔
396
                indexCIDR.Valid = true
4✔
397
        }
4✔
398

399
        query := `
1,398✔
400
                WITH t1 AS (
1,398✔
401
                        INSERT INTO ` + schemaName + `.` + recordChangesTableName + ` (type, id, data, modified_at, deleted_at)
1,398✔
402
                        VALUES ($1, $2, $3, $4, $5)
1,398✔
403
                        RETURNING *
1,398✔
404
                )
1,398✔
405
        `
1,398✔
406
        args := []any{
1,398✔
407
                record.GetType(), record.GetId(), data, modifiedAt, deletedAt,
1,398✔
408
        }
1,398✔
409
        if record.GetDeletedAt() == nil {
2,794✔
410
                query += `
1,396✔
411
                        INSERT INTO ` + schemaName + `.` + recordsTableName + ` (type, id, version, data, modified_at, index_cidr)
1,396✔
412
                        VALUES ($1, $2, (SELECT version FROM t1), $3, $4, $6)
1,396✔
413
                        ON CONFLICT (type, id) DO UPDATE
1,396✔
414
                        SET version=(SELECT version FROM t1), data=$3, modified_at=$4, index_cidr=$6
1,396✔
415
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
1,396✔
416
                `
1,396✔
417
                args = append(args, indexCIDR)
1,396✔
418
        } else {
1,398✔
419
                query += `
2✔
420
                        DELETE FROM ` + schemaName + `.` + recordsTableName + `
2✔
421
                        WHERE type=$1 AND id=$2
2✔
422
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
2✔
423
                `
2✔
424
        }
2✔
425
        err = q.QueryRow(ctx, query, args...).Scan(&record.Version)
1,398✔
426
        if err != nil && !isNotFound(err) {
2,408✔
427
                return fmt.Errorf("postgres: failed to execute query: %w", err)
1,010✔
428
        }
1,010✔
429

430
        return nil
388✔
431
}
432

433
// patchRecord updates specific fields of an existing record.
434
func patchRecord(
435
        ctx context.Context, p *pgxpool.Pool, record *databroker.Record, fields *fieldmaskpb.FieldMask,
436
) error {
204✔
437
        tx, err := p.Begin(ctx)
204✔
438
        if err != nil {
204✔
439
                return err
×
440
        }
×
441
        defer func() { _ = tx.Rollback(ctx) }()
408✔
442

443
        existing, err := getRecord(ctx, tx, record.GetType(), record.GetId(), lockModeUpdate)
204✔
444
        if isNotFound(err) {
206✔
445
                return storage.ErrNotFound
2✔
446
        } else if err != nil {
204✔
447
                return err
×
448
        }
×
449

450
        if err := storage.PatchRecord(existing, record, fields); err != nil {
202✔
451
                return err
×
452
        }
×
453

454
        if err := putRecordAndChange(ctx, tx, record); err != nil {
202✔
455
                return err
×
456
        }
×
457

458
        return tx.Commit(ctx)
202✔
459
}
460

461
func putService(ctx context.Context, q querier, svc *registry.Service, expiresAt time.Time) error {
4✔
462
        query := `
4✔
463
                INSERT INTO ` + schemaName + `.` + servicesTableName + ` (kind, endpoint, expires_at)
4✔
464
                VALUES ($1, $2, $3)
4✔
465
                ON CONFLICT (kind, endpoint) DO UPDATE
4✔
466
                SET expires_at=$3
4✔
467
        `
4✔
468
        _, err := q.Exec(ctx, query, svc.GetKind().String(), svc.GetEndpoint(), expiresAt)
4✔
469
        return err
4✔
470
}
4✔
471

472
func setCheckpoint(ctx context.Context, q querier, serverVersion, recordVersion uint64) error {
1✔
473
        var sv, rv pgtype.Numeric
1✔
474
        err := sv.Scan(strconv.FormatUint(serverVersion, 10))
1✔
475
        if err != nil {
1✔
476
                return err
×
477
        }
×
478
        err = rv.Scan(strconv.FormatUint(recordVersion, 10))
1✔
479
        if err != nil {
1✔
480
                return err
×
481
        }
×
482
        _, err = q.Exec(ctx, `
1✔
483
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
484
                SET server_version=$1, record_version=$2
1✔
485
        `, sv, rv)
1✔
486
        return err
1✔
487
}
488

489
func setOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
26✔
490
        if proto.Equal(options, new(databroker.Options)) {
27✔
491
                _, err := q.Exec(ctx, `
1✔
492
                        DELETE FROM `+schemaName+`.`+recordOptionsTableName+`
1✔
493
                        WHERE type=$1
1✔
494
                `, recordType)
1✔
495
                return err
1✔
496
        }
1✔
497

498
        capacity := pgtype.Int8{}
25✔
499
        if options != nil && options.Capacity != nil {
35✔
500
                capacity.Int64 = int64(options.GetCapacity())
10✔
501
                capacity.Valid = true
10✔
502
        }
10✔
503

504
        ttl := pgtype.Int8{}
25✔
505
        if options != nil && options.Ttl != nil {
34✔
506
                ttl.Int64 = int64(options.GetTtl().AsDuration())
9✔
507
                ttl.Valid = true
9✔
508
        }
9✔
509

510
        _, err := q.Exec(ctx, `
25✔
511
                INSERT INTO `+schemaName+`.`+recordOptionsTableName+` (type, capacity, fields, ttl)
25✔
512
                VALUES ($1, $2, $3, $4)
25✔
513
                ON CONFLICT (type) DO UPDATE
25✔
514
                SET capacity=$2, fields=$3, ttl=$4
25✔
515
        `, recordType, capacity, options.GetIndexableFields(), ttl)
25✔
516
        return err
25✔
517
}
518

519
func signalRecordChange(ctx context.Context, q querier) error {
357✔
520
        _, err := q.Exec(ctx, `NOTIFY `+recordChangeNotifyName)
357✔
521
        return err
357✔
522
}
357✔
523

524
func signalServiceChange(ctx context.Context, q querier) error {
2✔
525
        _, err := q.Exec(ctx, `NOTIFY `+serviceChangeNotifyName)
2✔
526
        return err
2✔
527
}
2✔
528

529
func jsonbFromAny(a *anypb.Any) ([]byte, error) {
1,398✔
530
        if a == nil {
2,409✔
531
                return nil, nil
1,011✔
532
        }
1,011✔
533

534
        return protojson.Marshal(a)
387✔
535
}
536

537
func timestamppbFromTimestamptz(ts pgtype.Timestamptz) *timestamppb.Timestamp {
1,178✔
538
        if !ts.Valid {
1,609✔
539
                return nil
431✔
540
        }
431✔
541
        return timestamppb.New(ts.Time)
747✔
542
}
543

544
func timestamptzFromTimestamppb(ts *timestamppb.Timestamp) pgtype.Timestamptz {
2,826✔
545
        if !ts.IsValid() {
4,222✔
546
                return pgtype.Timestamptz{}
1,396✔
547
        }
1,396✔
548
        return pgtype.Timestamptz{Time: ts.AsTime(), Valid: true}
1,430✔
549
}
550

551
func isNotFound(err error) bool {
2,728✔
552
        return errors.Is(err, pgx.ErrNoRows) || errors.Is(err, storage.ErrNotFound)
2,728✔
553
}
2,728✔
554

555
func isUnknownType(err error) bool {
745✔
556
        if err == nil {
1,485✔
557
                return false
740✔
558
        }
740✔
559

560
        return errors.Is(err, protoregistry.NotFound) ||
5✔
561
                strings.Contains(err.Error(), "unable to resolve") // protojson doesn't wrap errors so check for the string
5✔
562
}
563

564
func collectRecord(row pgx.CollectableRow) (*databroker.Record, error) {
434✔
565
        var recordType, id string
434✔
566
        var version uint64
434✔
567
        var data []byte
434✔
568
        var modifiedAt pgtype.Timestamptz
434✔
569
        var deletedAt pgtype.Timestamptz
434✔
570
        err := row.Scan(&recordType, &id, &version, &data, &modifiedAt, &deletedAt)
434✔
571
        if err != nil {
434✔
572
                return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
573
        }
×
574

575
        a, err := protoutil.UnmarshalAnyJSON(data)
434✔
576
        if isUnknownType(err) || len(data) == 0 {
438✔
577
                a = protoutil.ToAny(protoutil.ToStruct(map[string]string{
4✔
578
                        "id": id,
4✔
579
                }))
4✔
580
        } else if err != nil {
434✔
581
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
582
        }
×
583

584
        return &databroker.Record{
434✔
585
                Version:    version,
434✔
586
                Type:       recordType,
434✔
587
                Id:         id,
434✔
588
                Data:       a,
434✔
589
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
434✔
590
                DeletedAt:  timestamppbFromTimestamptz(deletedAt),
434✔
591
        }, nil
434✔
592
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc