• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 18593757278

17 Oct 2025 01:11PM UTC coverage: 53.936% (-0.03%) from 53.97%
18593757278

push

github

web-flow
authenticate: load session directly (#5889)

## Summary
Remove the session middleware and load sessions directly. This code was
only ever used by the authenticate service and it was confusing to store
the session state in the context.

## Related issues
-
[ENG-3001](https://linear.app/pomerium/issue/ENG-3001/core-session-improvements)


## Checklist

- [x] reference any related issues
- [x] updated unit tests
- [x] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [x] ready for review

6 of 8 new or added lines in 1 file covered. (75.0%)

35 existing lines in 9 files now uncovered.

27416 of 50831 relevant lines covered (53.94%)

86.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.14
/pkg/storage/postgres/postgres.go
1
// Package postgres contains an implementation of the storage.Backend backed by postgres.
2
package postgres
3

4
import (
5
        "context"
6
        "errors"
7
        "fmt"
8
        "sort"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "github.com/jackc/pgx/v5"
14
        "github.com/jackc/pgx/v5/pgconn"
15
        "github.com/jackc/pgx/v5/pgtype"
16
        "github.com/jackc/pgx/v5/pgxpool"
17
        "google.golang.org/protobuf/encoding/protojson"
18
        "google.golang.org/protobuf/proto"
19
        "google.golang.org/protobuf/reflect/protoregistry"
20
        "google.golang.org/protobuf/types/known/anypb"
21
        "google.golang.org/protobuf/types/known/fieldmaskpb"
22
        "google.golang.org/protobuf/types/known/timestamppb"
23

24
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
25
        "github.com/pomerium/pomerium/pkg/grpc/registry"
26
        "github.com/pomerium/pomerium/pkg/protoutil"
27
        "github.com/pomerium/pomerium/pkg/storage"
28
)
29

30
const (
31
        recordBatchSize   = 64
32
        watchPollInterval = 30 * time.Second
33
)
34

35
var (
36
        schemaName              = "pomerium"
37
        migrationInfoTableName  = "migration_info"
38
        recordsTableName        = "records"
39
        recordChangesTableName  = "record_changes"
40
        recordChangeNotifyName  = "pomerium_record_change"
41
        recordOptionsTableName  = "record_options"
42
        leasesTableName         = "leases"
43
        serviceChangeNotifyName = "pomerium_service_change"
44
        servicesTableName       = "services"
45
        checkpointsTableName    = "checkpoints"
46
)
47

48
type querier interface {
49
        Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
50
        Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
51
        QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
52
}
53

54
func clearRecords(ctx context.Context, q querier, newServerVersion uint64) error {
1✔
55
        _, err := q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordChangesTableName)
1✔
56
        if err != nil {
1✔
57
                return err
×
58
        }
×
59
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordsTableName)
1✔
60
        if err != nil {
1✔
61
                return err
×
62
        }
×
63
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordOptionsTableName)
1✔
64
        if err != nil {
1✔
65
                return err
×
66
        }
×
67
        _, err = q.Exec(ctx, `
1✔
68
                UPDATE `+schemaName+`.`+migrationInfoTableName+`
1✔
69
                SET server_version = $1
1✔
70
        `, newServerVersion)
1✔
71
        if err != nil {
1✔
72
                return err
×
73
        }
×
74

75
        _, err = q.Exec(ctx, `
1✔
76
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
77
                SET server_version = 0, record_version = 0
1✔
78
        `)
1✔
79
        if err != nil {
1✔
80
                return err
×
81
        }
×
82

83
        return nil
1✔
84
}
85

86
func deleteChangesBefore(ctx context.Context, q querier, cutoff time.Time) error {
3✔
87
        // we always want to keep at least one row in the changes table,
3✔
88
        // so we take the changes table, sort by version descending, skip the first row
3✔
89
        // and then find any changes before the cutoff
3✔
90
        _, err := q.Exec(ctx, `
3✔
91
                WITH t1 AS (
3✔
92
                        SELECT *
3✔
93
                        FROM `+schemaName+`.`+recordChangesTableName+`
3✔
94
                        ORDER BY version DESC
3✔
95
                        OFFSET 1
3✔
96
                        FOR UPDATE SKIP LOCKED
3✔
97
                ), t2 AS (
3✔
98
                        SELECT version
3✔
99
                        FROM t1
3✔
100
                        WHERE modified_at<$1
3✔
101
                        FOR UPDATE SKIP LOCKED
3✔
102
                )
3✔
103
                DELETE FROM `+schemaName+`.`+recordChangesTableName+` t3
3✔
104
                USING t2
3✔
105
                WHERE t2.version=t3.version
3✔
106
        `, cutoff)
3✔
107
        return err
3✔
108
}
3✔
109

110
func deleteExpiredServices(ctx context.Context, q querier, cutoff time.Time) (rowCount int64, err error) {
9✔
111
        cmd, err := q.Exec(ctx, `
9✔
112
                WITH t1 AS (
9✔
113
                        SELECT kind, endpoint
9✔
114
                        FROM `+schemaName+`.`+servicesTableName+`
9✔
115
                        WHERE expires_at<$1
9✔
116
                        FOR UPDATE SKIP LOCKED
9✔
117
                )
9✔
118
                DELETE FROM `+schemaName+`.`+servicesTableName+` t2
9✔
119
                USING t1
9✔
120
                WHERE t1.kind=t2.kind
9✔
121
                  AND t1.endpoint=t2.endpoint
9✔
122
        `, cutoff)
9✔
123
        if err != nil {
12✔
124
                return 0, err
3✔
125
        }
3✔
126
        return cmd.RowsAffected(), nil
6✔
127
}
128

129
func dup(record *databroker.Record) *databroker.Record {
1,354✔
130
        return proto.Clone(record).(*databroker.Record)
1,354✔
131
}
1,354✔
132

133
func enforceOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
131✔
134
        if options == nil || options.Capacity == nil {
251✔
135
                return nil
120✔
136
        }
120✔
137

138
        _, err := q.Exec(ctx, `
11✔
139
                DELETE FROM `+schemaName+`.`+recordsTableName+`
11✔
140
                WHERE type=$1
11✔
141
                  AND id NOT IN (
11✔
142
                        SELECT id
11✔
143
                        FROM `+schemaName+`.`+recordsTableName+`
11✔
144
                        WHERE type=$1
11✔
145
                        ORDER BY version DESC
11✔
146
                        LIMIT $2
11✔
147
                )
11✔
148
        `, recordType, options.GetCapacity())
11✔
149
        return err
11✔
150
}
151

152
func getRecordVersionRange(ctx context.Context, q querier) (earliestRecordVersion, latestRecordVersion uint64, err error) {
36✔
153
        err = q.QueryRow(ctx, `
36✔
154
                SELECT COALESCE(MIN(version), 0), COALESCE(MAX(version), 0)
36✔
155
                FROM `+schemaName+`.`+recordChangesTableName+`
36✔
156
        `).Scan(&earliestRecordVersion, &latestRecordVersion)
36✔
157
        return earliestRecordVersion, latestRecordVersion, err
36✔
158
}
36✔
159

160
func getCheckpoint(ctx context.Context, q querier) (serverVersion, recordVersion uint64, err error) {
1✔
161
        var sv, rv pgtype.Numeric
1✔
162
        err = q.QueryRow(ctx, `
1✔
163
                SELECT server_version, record_version
1✔
164
                FROM `+schemaName+`.`+checkpointsTableName+`
1✔
165
        `).Scan(&sv, &rv)
1✔
166
        return sv.Int.Uint64(), rv.Int.Uint64(), err
1✔
167
}
1✔
168

169
func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) {
132✔
170
        var capacity pgtype.Int8
132✔
171
        err := q.QueryRow(ctx, `
132✔
172
                SELECT capacity
132✔
173
                FROM `+schemaName+`.`+recordOptionsTableName+`
132✔
174
                WHERE type=$1
132✔
175
        `, recordType).Scan(&capacity)
132✔
176
        if err != nil && !isNotFound(err) {
132✔
177
                return nil, err
×
178
        }
×
179
        options := new(databroker.Options)
132✔
180
        if capacity.Valid {
143✔
181
                options.Capacity = proto.Uint64(uint64(capacity.Int64))
11✔
182
        }
11✔
183
        return options, nil
132✔
184
}
185

186
type lockMode string
187

188
const (
189
        lockModeNone   lockMode = ""
190
        lockModeUpdate lockMode = "FOR UPDATE"
191
)
192

193
func getRecord(
194
        ctx context.Context, q querier, recordType, recordID string, lockMode lockMode,
195
) (*databroker.Record, error) {
1,313✔
196
        var version uint64
1,313✔
197
        var data []byte
1,313✔
198
        var modifiedAt pgtype.Timestamptz
1,313✔
199
        err := q.QueryRow(ctx, `
1,313✔
200
                SELECT version, data, modified_at
1,313✔
201
                  FROM `+schemaName+`.`+recordsTableName+`
1,313✔
202
                 WHERE type=$1 AND id=$2 `+string(lockMode),
1,313✔
203
                recordType, recordID).Scan(&version, &data, &modifiedAt)
1,313✔
204
        if isNotFound(err) {
2,317✔
205
                return nil, storage.ErrNotFound
1,004✔
206
        } else if err != nil {
1,313✔
207
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
208
        }
×
209

210
        a, err := protoutil.UnmarshalAnyJSON(data)
309✔
211
        if isUnknownType(err) {
310✔
212
                return nil, storage.ErrNotFound
1✔
213
        } else if err != nil {
309✔
214
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
215
        }
×
216

217
        return &databroker.Record{
308✔
218
                Version:    version,
308✔
219
                Type:       recordType,
308✔
220
                Id:         recordID,
308✔
221
                Data:       a,
308✔
222
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
308✔
223
        }, nil
308✔
224
}
225

226
func listChangedRecordsAfter(ctx context.Context, q querier, recordType string, lastRecordVersion uint64) ([]*databroker.Record, error) {
57✔
227
        args := []any{lastRecordVersion, recordBatchSize}
57✔
228
        query := `
57✔
229
                SELECT type, id, version, data, modified_at, deleted_at
57✔
230
                FROM ` + schemaName + `.` + recordChangesTableName + `
57✔
231
                WHERE version>$1
57✔
232
        `
57✔
233
        if recordType != "" {
91✔
234
                args = append(args, recordType)
34✔
235
                query += ` AND type=$3`
34✔
236
        }
34✔
237
        query += `
57✔
238
                ORDER BY version
57✔
239
                LIMIT $2
57✔
240
        `
57✔
241
        rows, err := q.Query(ctx, query, args...)
57✔
242
        if err != nil {
57✔
243
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
244
        }
×
245
        return pgx.CollectRows(rows, collectRecord)
57✔
246
}
247

248
func listLatestRecordsAfter(ctx context.Context, q querier, expr storage.FilterExpression, lastRecordType, lastRecordID string) ([]*databroker.Record, error) {
27✔
249
        args := []any{lastRecordType, lastRecordID, recordBatchSize}
27✔
250
        query := `
27✔
251
                SELECT type, id, version, data, modified_at, NULL::timestamptz
27✔
252
                FROM ` + schemaName + `.` + recordsTableName + `
27✔
253
                WHERE ((type>$1) OR (type=$1 AND id>$2))
27✔
254
        `
27✔
255
        if expr != nil {
44✔
256
                query += "AND "
17✔
257
                err := addFilterExpressionToQuery(&query, &args, expr)
17✔
258
                if err != nil {
17✔
259
                        return nil, fmt.Errorf("postgres: failed to add filter to query: %w", err)
×
260
                }
×
261
        }
262
        query += `
27✔
263
                ORDER BY type, id
27✔
264
                LIMIT $3
27✔
265
        `
27✔
266
        rows, err := q.Query(ctx, query, args...)
27✔
267
        if err != nil {
27✔
268
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
269
        }
×
270
        return pgx.CollectRows(rows, collectRecord)
27✔
271
}
272

273
func listServices(ctx context.Context, q querier) ([]*registry.Service, error) {
2✔
274
        var services []*registry.Service
2✔
275

2✔
276
        query := `
2✔
277
                SELECT kind, endpoint
2✔
278
                FROM  ` + schemaName + `.` + servicesTableName + `
2✔
279
                ORDER BY kind, endpoint
2✔
280
        `
2✔
281
        rows, err := q.Query(ctx, query)
2✔
282
        if err != nil {
2✔
283
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
284
        }
×
285
        defer rows.Close()
2✔
286

2✔
287
        for rows.Next() {
5✔
288
                var kind, endpoint string
3✔
289
                err = rows.Scan(&kind, &endpoint)
3✔
290
                if err != nil {
3✔
291
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
292
                }
×
293

294
                services = append(services, &registry.Service{
3✔
295
                        Kind:     registry.ServiceKind(registry.ServiceKind_value[kind]),
3✔
296
                        Endpoint: endpoint,
3✔
297
                })
3✔
298
        }
299
        err = rows.Err()
2✔
300
        if err != nil {
2✔
301
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
302
        }
×
303

304
        return services, nil
2✔
305
}
306

307
func listTypes(ctx context.Context, q querier) ([]string, error) {
11✔
308
        query := `
11✔
309
                SELECT DISTINCT type
11✔
310
                FROM ` + schemaName + `.` + recordsTableName + `
11✔
311
        `
11✔
312
        rows, err := q.Query(ctx, query)
11✔
313
        if err != nil {
20✔
314
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
9✔
315
        }
9✔
316
        defer rows.Close()
2✔
317

2✔
318
        var types []string
2✔
319
        for rows.Next() {
12✔
320
                var recordType string
10✔
321
                err = rows.Scan(&recordType)
10✔
322
                if err != nil {
10✔
323
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
324
                }
×
325

326
                types = append(types, recordType)
10✔
327
        }
328
        err = rows.Err()
2✔
329
        if err != nil {
2✔
UNCOV
330
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
UNCOV
331
        }
×
332

333
        sort.Strings(types)
2✔
334
        return types, nil
2✔
335
}
336

337
func maybeAcquireLease(ctx context.Context, q querier, leaseName, leaseID string, ttl time.Duration) (leaseHolderID string, err error) {
14✔
338
        tbl := schemaName + "." + leasesTableName
14✔
339
        expiresAt := timestamptzFromTimestamppb(timestamppb.New(time.Now().Add(ttl)))
14✔
340
        now := timestamptzFromTimestamppb(timestamppb.Now())
14✔
341
        err = q.QueryRow(ctx, `
14✔
342
                INSERT INTO `+tbl+` (name, id, expires_at)
14✔
343
                VALUES ($1, $2, $3)
14✔
344
                ON CONFLICT (name) DO UPDATE
14✔
345
                SET id=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $2 ELSE `+tbl+`.id END,
14✔
346
                    expires_at=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $3 ELSE `+tbl+`.expires_at END
14✔
347
                RETURNING `+tbl+`.id
14✔
348
        `, leaseName, leaseID, expiresAt, now).Scan(&leaseHolderID)
14✔
349
        return leaseHolderID, err
14✔
350
}
14✔
351

352
func putRecordAndChange(ctx context.Context, q querier, record *databroker.Record) error {
1,362✔
353
        data, err := jsonbFromAny(record.GetData())
1,362✔
354
        if err != nil {
1,362✔
355
                return fmt.Errorf("postgres: failed to convert any to json: %w", err)
×
356
        }
×
357

358
        modifiedAt := timestamptzFromTimestamppb(record.GetModifiedAt())
1,362✔
359
        deletedAt := timestamptzFromTimestamppb(record.GetDeletedAt())
1,362✔
360
        indexCIDR := &pgtype.Text{Valid: false}
1,362✔
361
        if cidr := storage.GetRecordIndexCIDR(record.GetData()); cidr != nil {
1,362✔
362
                indexCIDR.String = cidr.String()
×
363
                indexCIDR.Valid = true
×
364
        }
×
365

366
        query := `
1,362✔
367
                WITH t1 AS (
1,362✔
368
                        INSERT INTO ` + schemaName + `.` + recordChangesTableName + ` (type, id, data, modified_at, deleted_at)
1,362✔
369
                        VALUES ($1, $2, $3, $4, $5)
1,362✔
370
                        RETURNING *
1,362✔
371
                )
1,362✔
372
        `
1,362✔
373
        args := []any{
1,362✔
374
                record.GetType(), record.GetId(), data, modifiedAt, deletedAt,
1,362✔
375
        }
1,362✔
376
        if record.GetDeletedAt() == nil {
2,713✔
377
                query += `
1,351✔
378
                        INSERT INTO ` + schemaName + `.` + recordsTableName + ` (type, id, version, data, modified_at, index_cidr)
1,351✔
379
                        VALUES ($1, $2, (SELECT version FROM t1), $3, $4, $6)
1,351✔
380
                        ON CONFLICT (type, id) DO UPDATE
1,351✔
381
                        SET version=(SELECT version FROM t1), data=$3, modified_at=$4, index_cidr=$6
1,351✔
382
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
1,351✔
383
                `
1,351✔
384
                args = append(args, indexCIDR)
1,351✔
385
        } else {
1,362✔
386
                query += `
11✔
387
                        DELETE FROM ` + schemaName + `.` + recordsTableName + `
11✔
388
                        WHERE type=$1 AND id=$2
11✔
389
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
11✔
390
                `
11✔
391
        }
11✔
392
        err = q.QueryRow(ctx, query, args...).Scan(&record.Version)
1,362✔
393
        if err != nil && !isNotFound(err) {
2,372✔
394
                return fmt.Errorf("postgres: failed to execute query: %w", err)
1,010✔
395
        }
1,010✔
396

397
        return nil
352✔
398
}
399

400
// patchRecord updates specific fields of an existing record.
401
func patchRecord(
402
        ctx context.Context, p *pgxpool.Pool, record *databroker.Record, fields *fieldmaskpb.FieldMask,
403
) error {
204✔
404
        tx, err := p.Begin(ctx)
204✔
405
        if err != nil {
204✔
406
                return err
×
407
        }
×
408
        defer func() { _ = tx.Rollback(ctx) }()
408✔
409

410
        existing, err := getRecord(ctx, tx, record.GetType(), record.GetId(), lockModeUpdate)
204✔
411
        if isNotFound(err) {
206✔
412
                return storage.ErrNotFound
2✔
413
        } else if err != nil {
204✔
414
                return err
×
415
        }
×
416

417
        if err := storage.PatchRecord(existing, record, fields); err != nil {
202✔
418
                return err
×
419
        }
×
420

421
        if err := putRecordAndChange(ctx, tx, record); err != nil {
202✔
422
                return err
×
423
        }
×
424

425
        return tx.Commit(ctx)
202✔
426
}
427

428
func putService(ctx context.Context, q querier, svc *registry.Service, expiresAt time.Time) error {
4✔
429
        query := `
4✔
430
                INSERT INTO ` + schemaName + `.` + servicesTableName + ` (kind, endpoint, expires_at)
4✔
431
                VALUES ($1, $2, $3)
4✔
432
                ON CONFLICT (kind, endpoint) DO UPDATE
4✔
433
                SET expires_at=$3
4✔
434
        `
4✔
435
        _, err := q.Exec(ctx, query, svc.GetKind().String(), svc.GetEndpoint(), expiresAt)
4✔
436
        return err
4✔
437
}
4✔
438

439
func setCheckpoint(ctx context.Context, q querier, serverVersion, recordVersion uint64) error {
1✔
440
        var sv, rv pgtype.Numeric
1✔
441
        err := sv.Scan(strconv.FormatUint(serverVersion, 10))
1✔
442
        if err != nil {
1✔
443
                return err
×
444
        }
×
445
        err = rv.Scan(strconv.FormatUint(recordVersion, 10))
1✔
446
        if err != nil {
1✔
447
                return err
×
448
        }
×
449
        _, err = q.Exec(ctx, `
1✔
450
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
451
                SET server_version=$1, record_version=$2
1✔
452
        `, sv, rv)
1✔
453
        return err
1✔
454
}
455

456
func setOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
2✔
457
        capacity := pgtype.Int8{}
2✔
458
        if options != nil && options.Capacity != nil {
4✔
459
                capacity.Int64 = int64(options.GetCapacity())
2✔
460
                capacity.Valid = true
2✔
461
        }
2✔
462

463
        _, err := q.Exec(ctx, `
2✔
464
                INSERT INTO `+schemaName+`.`+recordOptionsTableName+` (type, capacity)
2✔
465
                VALUES ($1, $2)
2✔
466
                ON CONFLICT (type) DO UPDATE
2✔
467
                SET capacity=$2
2✔
468
        `, recordType, capacity)
2✔
469
        return err
2✔
470
}
471

472
func signalRecordChange(ctx context.Context, q querier) error {
333✔
473
        _, err := q.Exec(ctx, `NOTIFY `+recordChangeNotifyName)
333✔
474
        return err
333✔
475
}
333✔
476

477
func signalServiceChange(ctx context.Context, q querier) error {
2✔
478
        _, err := q.Exec(ctx, `NOTIFY `+serviceChangeNotifyName)
2✔
479
        return err
2✔
480
}
2✔
481

482
func jsonbFromAny(a *anypb.Any) ([]byte, error) {
1,362✔
483
        if a == nil {
2,373✔
484
                return nil, nil
1,011✔
485
        }
1,011✔
486

487
        return protojson.Marshal(a)
351✔
488
}
489

490
func timestamppbFromTimestamptz(ts pgtype.Timestamptz) *timestamppb.Timestamp {
1,050✔
491
        if !ts.Valid {
1,420✔
492
                return nil
370✔
493
        }
370✔
494
        return timestamppb.New(ts.Time)
680✔
495
}
496

497
func timestamptzFromTimestamppb(ts *timestamppb.Timestamp) pgtype.Timestamptz {
2,752✔
498
        if !ts.IsValid() {
4,103✔
499
                return pgtype.Timestamptz{}
1,351✔
500
        }
1,351✔
501
        return pgtype.Timestamptz{Time: ts.AsTime(), Valid: true}
1,401✔
502
}
503

504
func isNotFound(err error) bool {
2,659✔
505
        return errors.Is(err, pgx.ErrNoRows) || errors.Is(err, storage.ErrNotFound)
2,659✔
506
}
2,659✔
507

508
func isUnknownType(err error) bool {
680✔
509
        if err == nil {
1,357✔
510
                return false
677✔
511
        }
677✔
512

513
        return errors.Is(err, protoregistry.NotFound) ||
3✔
514
                strings.Contains(err.Error(), "unable to resolve") // protojson doesn't wrap errors so check for the string
3✔
515
}
516

517
func collectRecord(row pgx.CollectableRow) (*databroker.Record, error) {
371✔
518
        var recordType, id string
371✔
519
        var version uint64
371✔
520
        var data []byte
371✔
521
        var modifiedAt pgtype.Timestamptz
371✔
522
        var deletedAt pgtype.Timestamptz
371✔
523
        err := row.Scan(&recordType, &id, &version, &data, &modifiedAt, &deletedAt)
371✔
524
        if err != nil {
371✔
525
                return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
526
        }
×
527

528
        a, err := protoutil.UnmarshalAnyJSON(data)
371✔
529
        if isUnknownType(err) || len(data) == 0 {
373✔
530
                a = protoutil.ToAny(protoutil.ToStruct(map[string]string{
2✔
531
                        "id": id,
2✔
532
                }))
2✔
533
        } else if err != nil {
371✔
534
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
535
        }
×
536

537
        return &databroker.Record{
371✔
538
                Version:    version,
371✔
539
                Type:       recordType,
371✔
540
                Id:         id,
371✔
541
                Data:       a,
371✔
542
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
371✔
543
                DeletedAt:  timestamppbFromTimestamptz(deletedAt),
371✔
544
        }, nil
371✔
545
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc