• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 21600807522

02 Feb 2026 05:39PM UTC coverage: 44.618% (-0.009%) from 44.627%
21600807522

push

github

web-flow
authorize: add missing MCP explanation trace (#6081)

There is special-case logic in the handleResultDenied() method to add
some MCP-specific error page explanation text. However there is also an
MCP-specific error page shown in the requireLoginResponse() method,
which does not currently have the special error page explanation text.

Extract the logic for adding the error page explanation text to its own
method, and call this new method from both MCP error code paths.

11 of 11 new or added lines in 1 file covered. (100.0%)

16 existing lines in 5 files now uncovered.

31036 of 69560 relevant lines covered (44.62%)

102.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.18
/pkg/storage/postgres/postgres.go
1
// Package postgres contains an implementation of the storage.Backend backed by postgres.
2
package postgres
3

4
import (
5
        "context"
6
        "errors"
7
        "fmt"
8
        "sort"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "github.com/jackc/pgx/v5"
14
        "github.com/jackc/pgx/v5/pgconn"
15
        "github.com/jackc/pgx/v5/pgtype"
16
        "github.com/jackc/pgx/v5/pgxpool"
17
        "google.golang.org/grpc/codes"
18
        "google.golang.org/grpc/status"
19
        "google.golang.org/protobuf/encoding/protojson"
20
        "google.golang.org/protobuf/proto"
21
        "google.golang.org/protobuf/reflect/protoregistry"
22
        "google.golang.org/protobuf/types/known/anypb"
23
        "google.golang.org/protobuf/types/known/fieldmaskpb"
24
        "google.golang.org/protobuf/types/known/timestamppb"
25

26
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
27
        "github.com/pomerium/pomerium/pkg/grpc/registry"
28
        "github.com/pomerium/pomerium/pkg/protoutil"
29
        "github.com/pomerium/pomerium/pkg/storage"
30
)
31

32
const (
33
        recordBatchSize   = 64
34
        watchPollInterval = 30 * time.Second
35
)
36

37
var (
38
        schemaName              = "pomerium"
39
        migrationInfoTableName  = "migration_info"
40
        recordsTableName        = "records"
41
        recordChangesTableName  = "record_changes"
42
        recordChangeNotifyName  = "pomerium_record_change"
43
        recordOptionsTableName  = "record_options"
44
        leasesTableName         = "leases"
45
        serviceChangeNotifyName = "pomerium_service_change"
46
        servicesTableName       = "services"
47
        checkpointsTableName    = "checkpoints"
48
)
49

50
type querier interface {
51
        Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
52
        Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
53
        QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
54
}
55

56
func clearRecords(ctx context.Context, q querier, newServerVersion uint64) error {
1✔
57
        _, err := q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordChangesTableName)
1✔
58
        if err != nil {
1✔
59
                return err
×
60
        }
×
61
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordsTableName)
1✔
62
        if err != nil {
1✔
63
                return err
×
64
        }
×
65
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordOptionsTableName)
1✔
66
        if err != nil {
1✔
67
                return err
×
68
        }
×
69
        _, err = q.Exec(ctx, `
1✔
70
                UPDATE `+schemaName+`.`+migrationInfoTableName+`
1✔
71
                SET server_version = $1
1✔
72
        `, newServerVersion)
1✔
73
        if err != nil {
1✔
74
                return err
×
75
        }
×
76

77
        _, err = q.Exec(ctx, `
1✔
78
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
79
                SET server_version = 0, record_version = 0
1✔
80
        `)
1✔
81
        if err != nil {
1✔
82
                return err
×
83
        }
×
84

85
        return nil
1✔
86
}
87

88
func deleteChangesBefore(ctx context.Context, q querier, cutoff time.Time) error {
1✔
89
        // we always want to keep at least one row in the changes table,
1✔
90
        // so we take the changes table, sort by version descending, skip the first row
1✔
91
        // and then find any changes before the cutoff
1✔
92
        _, err := q.Exec(ctx, `
1✔
93
                WITH t1 AS (
1✔
94
                        SELECT *
1✔
95
                        FROM `+schemaName+`.`+recordChangesTableName+`
1✔
96
                        ORDER BY version DESC
1✔
97
                        OFFSET 1
1✔
98
                        FOR UPDATE SKIP LOCKED
1✔
99
                ), t2 AS (
1✔
100
                        SELECT version
1✔
101
                        FROM t1
1✔
102
                        WHERE modified_at<$1
1✔
103
                        FOR UPDATE SKIP LOCKED
1✔
104
                )
1✔
105
                DELETE FROM `+schemaName+`.`+recordChangesTableName+` t3
1✔
106
                USING t2
1✔
107
                WHERE t2.version=t3.version
1✔
108
        `, cutoff)
1✔
109
        return err
1✔
110
}
1✔
111

112
func deleteExpiredServices(ctx context.Context, q querier, cutoff time.Time) (rowCount int64, err error) {
15✔
113
        cmd, err := q.Exec(ctx, `
15✔
114
                WITH t1 AS (
15✔
115
                        SELECT kind, endpoint
15✔
116
                        FROM `+schemaName+`.`+servicesTableName+`
15✔
117
                        WHERE expires_at<$1
15✔
118
                        FOR UPDATE SKIP LOCKED
15✔
119
                )
15✔
120
                DELETE FROM `+schemaName+`.`+servicesTableName+` t2
15✔
121
                USING t1
15✔
122
                WHERE t1.kind=t2.kind
15✔
123
                  AND t1.endpoint=t2.endpoint
15✔
124
        `, cutoff)
15✔
125
        if err != nil {
16✔
126
                return 0, err
1✔
127
        }
1✔
128
        return cmd.RowsAffected(), nil
14✔
129
}
130

131
func enforceOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
139✔
132
        if options == nil || options.Capacity == nil {
267✔
133
                return nil
128✔
134
        }
128✔
135

136
        _, err := q.Exec(ctx, `
11✔
137
                DELETE FROM `+schemaName+`.`+recordsTableName+`
11✔
138
                WHERE type=$1
11✔
139
                  AND id NOT IN (
11✔
140
                        SELECT id
11✔
141
                        FROM `+schemaName+`.`+recordsTableName+`
11✔
142
                        WHERE type=$1
11✔
143
                        ORDER BY version DESC
11✔
144
                        LIMIT $2
11✔
145
                )
11✔
146
        `, recordType, options.GetCapacity())
11✔
147
        return err
11✔
148
}
149

150
func getRecordVersionRange(ctx context.Context, q querier) (earliestRecordVersion, latestRecordVersion uint64, err error) {
63✔
151
        err = q.QueryRow(ctx, `
63✔
152
                SELECT COALESCE(MIN(version), 0), COALESCE(MAX(version), 0)
63✔
153
                FROM `+schemaName+`.`+recordChangesTableName+`
63✔
154
        `).Scan(&earliestRecordVersion, &latestRecordVersion)
63✔
155
        return earliestRecordVersion, latestRecordVersion, err
63✔
156
}
63✔
157

158
func getCheckpoint(ctx context.Context, q querier) (serverVersion, recordVersion uint64, err error) {
1✔
159
        var sv, rv pgtype.Numeric
1✔
160
        err = q.QueryRow(ctx, `
1✔
161
                SELECT server_version, record_version
1✔
162
                FROM `+schemaName+`.`+checkpointsTableName+`
1✔
163
        `).Scan(&sv, &rv)
1✔
164
        return sv.Int.Uint64(), rv.Int.Uint64(), err
1✔
165
}
1✔
166

167
func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) {
171✔
168
        var capacity pgtype.Int8
171✔
169
        var fields pgtype.Array[string]
171✔
170
        err := q.QueryRow(ctx, `
171✔
171
                SELECT capacity, fields
171✔
172
                FROM `+schemaName+`.`+recordOptionsTableName+`
171✔
173
                WHERE type=$1
171✔
174
        `, recordType).Scan(&capacity, &fields)
171✔
175
        if isNotFound(err) {
318✔
176
                return nil, status.Error(codes.NotFound, err.Error())
147✔
177
        }
147✔
178
        if err != nil {
24✔
179
                return nil, err
×
180
        }
×
181
        options := new(databroker.Options)
24✔
182
        if capacity.Valid {
41✔
183
                options.Capacity = proto.Uint64(uint64(capacity.Int64))
17✔
184
        }
17✔
185
        if fields.Valid {
35✔
186
                options.IndexableFields = fields.Elements
11✔
187
        }
11✔
188
        return options, nil
24✔
189
}
190

191
type lockMode string
192

193
const (
194
        lockModeNone   lockMode = ""
195
        lockModeUpdate lockMode = "FOR UPDATE"
196
)
197

198
func getRecord(
199
        ctx context.Context, q querier, recordType, recordID string, lockMode lockMode,
200
) (*databroker.Record, error) {
1,313✔
201
        var version uint64
1,313✔
202
        var data []byte
1,313✔
203
        var modifiedAt pgtype.Timestamptz
1,313✔
204
        err := q.QueryRow(ctx, `
1,313✔
205
                SELECT version, data, modified_at
1,313✔
206
                  FROM `+schemaName+`.`+recordsTableName+`
1,313✔
207
                 WHERE type=$1 AND id=$2 `+string(lockMode),
1,313✔
208
                recordType, recordID).Scan(&version, &data, &modifiedAt)
1,313✔
209
        if isNotFound(err) {
2,317✔
210
                return nil, storage.ErrNotFound
1,004✔
211
        } else if err != nil {
1,313✔
212
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
213
        }
×
214

215
        a, err := protoutil.UnmarshalAnyJSON(data)
309✔
216
        if isUnknownType(err) {
310✔
217
                return nil, storage.ErrNotFound
1✔
218
        } else if err != nil {
309✔
219
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
220
        }
×
221

222
        return &databroker.Record{
308✔
223
                Version:    version,
308✔
224
                Type:       recordType,
308✔
225
                Id:         recordID,
308✔
226
                Data:       a,
308✔
227
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
308✔
228
        }, nil
308✔
229
}
230

231
func listChangedRecordsAfter(ctx context.Context, q querier, recordType string, lastRecordVersion uint64) ([]*databroker.Record, error) {
65✔
232
        args := []any{lastRecordVersion, recordBatchSize}
65✔
233
        query := `
65✔
234
                SELECT type, id, version, data, modified_at, deleted_at
65✔
235
                FROM ` + schemaName + `.` + recordChangesTableName + `
65✔
236
                WHERE version>$1
65✔
237
        `
65✔
238
        if recordType != "" {
108✔
239
                args = append(args, recordType)
43✔
240
                query += ` AND type=$3`
43✔
241
        }
43✔
242
        query += `
65✔
243
                ORDER BY version
65✔
244
                LIMIT $2
65✔
245
        `
65✔
246
        rows, err := q.Query(ctx, query, args...)
65✔
247
        if err != nil {
65✔
248
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
249
        }
×
250
        return pgx.CollectRows(rows, collectRecord)
65✔
251
}
252

253
func listLatestRecordsAfter(ctx context.Context, q querier, expr storage.FilterExpression, lastRecordType, lastRecordID string) ([]*databroker.Record, error) {
69✔
254
        args := []any{lastRecordType, lastRecordID, recordBatchSize}
69✔
255
        query := `
69✔
256
                SELECT type, id, version, data, modified_at, NULL::timestamptz
69✔
257
                FROM ` + schemaName + `.` + recordsTableName + `
69✔
258
                WHERE ((type>$1) OR (type=$1 AND id>$2))
69✔
259
        `
69✔
260
        if expr != nil {
126✔
261
                query += "AND "
57✔
262
                err := addFilterExpressionToQuery(&query, &args, expr)
57✔
263
                if err != nil {
57✔
264
                        return nil, fmt.Errorf("postgres: failed to add filter to query: %w", err)
×
265
                }
×
266
        }
267
        query += `
69✔
268
                ORDER BY type, id
69✔
269
                LIMIT $3
69✔
270
        `
69✔
271
        rows, err := q.Query(ctx, query, args...)
69✔
272
        if err != nil {
69✔
273
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
274
        }
×
275
        return pgx.CollectRows(rows, collectRecord)
69✔
276
}
277

278
func listServices(ctx context.Context, q querier) ([]*registry.Service, error) {
2✔
279
        var services []*registry.Service
2✔
280

2✔
281
        query := `
2✔
282
                SELECT kind, endpoint
2✔
283
                FROM  ` + schemaName + `.` + servicesTableName + `
2✔
284
                ORDER BY kind, endpoint
2✔
285
        `
2✔
286
        rows, err := q.Query(ctx, query)
2✔
287
        if err != nil {
2✔
288
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
289
        }
×
290
        defer rows.Close()
2✔
291

2✔
292
        for rows.Next() {
5✔
293
                var kind, endpoint string
3✔
294
                err = rows.Scan(&kind, &endpoint)
3✔
295
                if err != nil {
3✔
296
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
297
                }
×
298

299
                services = append(services, &registry.Service{
3✔
300
                        Kind:     registry.ServiceKind(registry.ServiceKind_value[kind]),
3✔
301
                        Endpoint: endpoint,
3✔
302
                })
3✔
303
        }
304
        err = rows.Err()
2✔
305
        if err != nil {
2✔
306
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
307
        }
×
308

309
        return services, nil
2✔
310
}
311

312
func listTypes(ctx context.Context, q querier) ([]string, error) {
11✔
313
        query := `
11✔
314
                SELECT DISTINCT type
11✔
315
                FROM ` + schemaName + `.` + recordsTableName + `
11✔
316
        `
11✔
317
        rows, err := q.Query(ctx, query)
11✔
318
        if err != nil {
20✔
319
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
9✔
320
        }
9✔
321
        defer rows.Close()
2✔
322

2✔
323
        var types []string
2✔
324
        for rows.Next() {
12✔
325
                var recordType string
10✔
326
                err = rows.Scan(&recordType)
10✔
327
                if err != nil {
10✔
328
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
329
                }
×
330

331
                types = append(types, recordType)
10✔
332
        }
333
        err = rows.Err()
2✔
334
        if err != nil {
2✔
UNCOV
335
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
UNCOV
336
        }
×
337

338
        sort.Strings(types)
2✔
339
        return types, nil
2✔
340
}
341

342
func maybeAcquireLease(ctx context.Context, q querier, leaseName, leaseID string, ttl time.Duration) (leaseHolderID string, err error) {
14✔
343
        tbl := schemaName + "." + leasesTableName
14✔
344
        expiresAt := timestamptzFromTimestamppb(timestamppb.New(time.Now().Add(ttl)))
14✔
345
        now := timestamptzFromTimestamppb(timestamppb.Now())
14✔
346
        err = q.QueryRow(ctx, `
14✔
347
                INSERT INTO `+tbl+` (name, id, expires_at)
14✔
348
                VALUES ($1, $2, $3)
14✔
349
                ON CONFLICT (name) DO UPDATE
14✔
350
                SET id=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $2 ELSE `+tbl+`.id END,
14✔
351
                    expires_at=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $3 ELSE `+tbl+`.expires_at END
14✔
352
                RETURNING `+tbl+`.id
14✔
353
        `, leaseName, leaseID, expiresAt, now).Scan(&leaseHolderID)
14✔
354
        return leaseHolderID, err
14✔
355
}
14✔
356

357
func putRecordAndChange(ctx context.Context, q querier, record *databroker.Record) error {
1,380✔
358
        data, err := jsonbFromAny(record.GetData())
1,380✔
359
        if err != nil {
1,380✔
360
                return fmt.Errorf("postgres: failed to convert any to json: %w", err)
×
361
        }
×
362

363
        modifiedAt := timestamptzFromTimestamppb(record.GetModifiedAt())
1,380✔
364
        deletedAt := timestamptzFromTimestamppb(record.GetDeletedAt())
1,380✔
365
        indexCIDR := &pgtype.Text{Valid: false}
1,380✔
366
        if cidr := storage.GetRecordIndexCIDR(record.GetData()); cidr != nil {
1,384✔
367
                indexCIDR.String = cidr.String()
4✔
368
                indexCIDR.Valid = true
4✔
369
        }
4✔
370

371
        query := `
1,380✔
372
                WITH t1 AS (
1,380✔
373
                        INSERT INTO ` + schemaName + `.` + recordChangesTableName + ` (type, id, data, modified_at, deleted_at)
1,380✔
374
                        VALUES ($1, $2, $3, $4, $5)
1,380✔
375
                        RETURNING *
1,380✔
376
                )
1,380✔
377
        `
1,380✔
378
        args := []any{
1,380✔
379
                record.GetType(), record.GetId(), data, modifiedAt, deletedAt,
1,380✔
380
        }
1,380✔
381
        if record.GetDeletedAt() == nil {
2,758✔
382
                query += `
1,378✔
383
                        INSERT INTO ` + schemaName + `.` + recordsTableName + ` (type, id, version, data, modified_at, index_cidr)
1,378✔
384
                        VALUES ($1, $2, (SELECT version FROM t1), $3, $4, $6)
1,378✔
385
                        ON CONFLICT (type, id) DO UPDATE
1,378✔
386
                        SET version=(SELECT version FROM t1), data=$3, modified_at=$4, index_cidr=$6
1,378✔
387
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
1,378✔
388
                `
1,378✔
389
                args = append(args, indexCIDR)
1,378✔
390
        } else {
1,380✔
391
                query += `
2✔
392
                        DELETE FROM ` + schemaName + `.` + recordsTableName + `
2✔
393
                        WHERE type=$1 AND id=$2
2✔
394
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
2✔
395
                `
2✔
396
        }
2✔
397
        err = q.QueryRow(ctx, query, args...).Scan(&record.Version)
1,380✔
398
        if err != nil && !isNotFound(err) {
2,390✔
399
                return fmt.Errorf("postgres: failed to execute query: %w", err)
1,010✔
400
        }
1,010✔
401

402
        return nil
370✔
403
}
404

405
// patchRecord updates specific fields of an existing record.
406
func patchRecord(
407
        ctx context.Context, p *pgxpool.Pool, record *databroker.Record, fields *fieldmaskpb.FieldMask,
408
) error {
204✔
409
        tx, err := p.Begin(ctx)
204✔
410
        if err != nil {
204✔
411
                return err
×
412
        }
×
413
        defer func() { _ = tx.Rollback(ctx) }()
408✔
414

415
        existing, err := getRecord(ctx, tx, record.GetType(), record.GetId(), lockModeUpdate)
204✔
416
        if isNotFound(err) {
206✔
417
                return storage.ErrNotFound
2✔
418
        } else if err != nil {
204✔
419
                return err
×
420
        }
×
421

422
        if err := storage.PatchRecord(existing, record, fields); err != nil {
202✔
423
                return err
×
424
        }
×
425

426
        if err := putRecordAndChange(ctx, tx, record); err != nil {
202✔
427
                return err
×
428
        }
×
429

430
        return tx.Commit(ctx)
202✔
431
}
432

433
func putService(ctx context.Context, q querier, svc *registry.Service, expiresAt time.Time) error {
4✔
434
        query := `
4✔
435
                INSERT INTO ` + schemaName + `.` + servicesTableName + ` (kind, endpoint, expires_at)
4✔
436
                VALUES ($1, $2, $3)
4✔
437
                ON CONFLICT (kind, endpoint) DO UPDATE
4✔
438
                SET expires_at=$3
4✔
439
        `
4✔
440
        _, err := q.Exec(ctx, query, svc.GetKind().String(), svc.GetEndpoint(), expiresAt)
4✔
441
        return err
4✔
442
}
4✔
443

444
func setCheckpoint(ctx context.Context, q querier, serverVersion, recordVersion uint64) error {
1✔
445
        var sv, rv pgtype.Numeric
1✔
446
        err := sv.Scan(strconv.FormatUint(serverVersion, 10))
1✔
447
        if err != nil {
1✔
448
                return err
×
449
        }
×
450
        err = rv.Scan(strconv.FormatUint(recordVersion, 10))
1✔
451
        if err != nil {
1✔
452
                return err
×
453
        }
×
454
        _, err = q.Exec(ctx, `
1✔
455
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
456
                SET server_version=$1, record_version=$2
1✔
457
        `, sv, rv)
1✔
458
        return err
1✔
459
}
460

461
func setOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
17✔
462
        if proto.Equal(options, new(databroker.Options)) {
18✔
463
                _, err := q.Exec(ctx, `
1✔
464
                        DELETE FROM `+schemaName+`.`+recordOptionsTableName+`
1✔
465
                        WHERE type=$1
1✔
466
                `, recordType)
1✔
467
                return err
1✔
468
        }
1✔
469

470
        capacity := pgtype.Int8{}
16✔
471
        if options != nil && options.Capacity != nil {
23✔
472
                capacity.Int64 = int64(options.GetCapacity())
7✔
473
                capacity.Valid = true
7✔
474
        }
7✔
475

476
        _, err := q.Exec(ctx, `
16✔
477
                INSERT INTO `+schemaName+`.`+recordOptionsTableName+` (type, capacity, fields)
16✔
478
                VALUES ($1, $2, $3)
16✔
479
                ON CONFLICT (type) DO UPDATE
16✔
480
                SET capacity=$2, fields=$3
16✔
481
        `, recordType, capacity, options.GetIndexableFields())
16✔
482
        return err
16✔
483
}
484

485
func signalRecordChange(ctx context.Context, q querier) error {
339✔
486
        _, err := q.Exec(ctx, `NOTIFY `+recordChangeNotifyName)
339✔
487
        return err
339✔
488
}
339✔
489

490
func signalServiceChange(ctx context.Context, q querier) error {
2✔
491
        _, err := q.Exec(ctx, `NOTIFY `+serviceChangeNotifyName)
2✔
492
        return err
2✔
493
}
2✔
494

495
func jsonbFromAny(a *anypb.Any) ([]byte, error) {
1,380✔
496
        if a == nil {
2,391✔
497
                return nil, nil
1,011✔
498
        }
1,011✔
499

500
        return protojson.Marshal(a)
369✔
501
}
502

503
func timestamppbFromTimestamptz(ts pgtype.Timestamptz) *timestamppb.Timestamp {
1,150✔
504
        if !ts.Valid {
1,570✔
505
                return nil
420✔
506
        }
420✔
507
        return timestamppb.New(ts.Time)
730✔
508
}
509

510
func timestamptzFromTimestamppb(ts *timestamppb.Timestamp) pgtype.Timestamptz {
2,788✔
511
        if !ts.IsValid() {
4,166✔
512
                return pgtype.Timestamptz{}
1,378✔
513
        }
1,378✔
514
        return pgtype.Timestamptz{Time: ts.AsTime(), Valid: true}
1,410✔
515
}
516

517
func isNotFound(err error) bool {
2,699✔
518
        return errors.Is(err, pgx.ErrNoRows) || errors.Is(err, storage.ErrNotFound)
2,699✔
519
}
2,699✔
520

521
func isUnknownType(err error) bool {
730✔
522
        if err == nil {
1,457✔
523
                return false
727✔
524
        }
727✔
525

526
        return errors.Is(err, protoregistry.NotFound) ||
3✔
527
                strings.Contains(err.Error(), "unable to resolve") // protojson doesn't wrap errors so check for the string
3✔
528
}
529

530
func collectRecord(row pgx.CollectableRow) (*databroker.Record, error) {
421✔
531
        var recordType, id string
421✔
532
        var version uint64
421✔
533
        var data []byte
421✔
534
        var modifiedAt pgtype.Timestamptz
421✔
535
        var deletedAt pgtype.Timestamptz
421✔
536
        err := row.Scan(&recordType, &id, &version, &data, &modifiedAt, &deletedAt)
421✔
537
        if err != nil {
421✔
538
                return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
539
        }
×
540

541
        a, err := protoutil.UnmarshalAnyJSON(data)
421✔
542
        if isUnknownType(err) || len(data) == 0 {
423✔
543
                a = protoutil.ToAny(protoutil.ToStruct(map[string]string{
2✔
544
                        "id": id,
2✔
545
                }))
2✔
546
        } else if err != nil {
421✔
547
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
548
        }
×
549

550
        return &databroker.Record{
421✔
551
                Version:    version,
421✔
552
                Type:       recordType,
421✔
553
                Id:         id,
421✔
554
                Data:       a,
421✔
555
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
421✔
556
                DeletedAt:  timestamppbFromTimestamptz(deletedAt),
421✔
557
        }, nil
421✔
558
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc