• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 19436539046

17 Nov 2025 04:19PM UTC coverage: 54.84% (-0.01%) from 54.85%
19436539046

push

github

web-flow
chore: add eslint config (#5931)

28705 of 52343 relevant lines covered (54.84%)

93.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.29
/pkg/storage/postgres/postgres.go
1
// Package postgres contains an implementation of the storage.Backend backed by postgres.
2
package postgres
3

4
import (
5
        "context"
6
        "errors"
7
        "fmt"
8
        "sort"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "github.com/jackc/pgx/v5"
14
        "github.com/jackc/pgx/v5/pgconn"
15
        "github.com/jackc/pgx/v5/pgtype"
16
        "github.com/jackc/pgx/v5/pgxpool"
17
        "google.golang.org/protobuf/encoding/protojson"
18
        "google.golang.org/protobuf/proto"
19
        "google.golang.org/protobuf/reflect/protoregistry"
20
        "google.golang.org/protobuf/types/known/anypb"
21
        "google.golang.org/protobuf/types/known/fieldmaskpb"
22
        "google.golang.org/protobuf/types/known/timestamppb"
23

24
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
25
        "github.com/pomerium/pomerium/pkg/grpc/registry"
26
        "github.com/pomerium/pomerium/pkg/protoutil"
27
        "github.com/pomerium/pomerium/pkg/storage"
28
)
29

30
const (
31
        recordBatchSize   = 64
32
        watchPollInterval = 30 * time.Second
33
)
34

35
var (
36
        schemaName              = "pomerium"
37
        migrationInfoTableName  = "migration_info"
38
        recordsTableName        = "records"
39
        recordChangesTableName  = "record_changes"
40
        recordChangeNotifyName  = "pomerium_record_change"
41
        recordOptionsTableName  = "record_options"
42
        leasesTableName         = "leases"
43
        serviceChangeNotifyName = "pomerium_service_change"
44
        servicesTableName       = "services"
45
        checkpointsTableName    = "checkpoints"
46
)
47

48
type querier interface {
49
        Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
50
        Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
51
        QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
52
}
53

54
func clearRecords(ctx context.Context, q querier, newServerVersion uint64) error {
1✔
55
        _, err := q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordChangesTableName)
1✔
56
        if err != nil {
1✔
57
                return err
×
58
        }
×
59
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordsTableName)
1✔
60
        if err != nil {
1✔
61
                return err
×
62
        }
×
63
        _, err = q.Exec(ctx, `DELETE FROM `+schemaName+`.`+recordOptionsTableName)
1✔
64
        if err != nil {
1✔
65
                return err
×
66
        }
×
67
        _, err = q.Exec(ctx, `
1✔
68
                UPDATE `+schemaName+`.`+migrationInfoTableName+`
1✔
69
                SET server_version = $1
1✔
70
        `, newServerVersion)
1✔
71
        if err != nil {
1✔
72
                return err
×
73
        }
×
74

75
        _, err = q.Exec(ctx, `
1✔
76
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
77
                SET server_version = 0, record_version = 0
1✔
78
        `)
1✔
79
        if err != nil {
1✔
80
                return err
×
81
        }
×
82

83
        return nil
1✔
84
}
85

86
func deleteChangesBefore(ctx context.Context, q querier, cutoff time.Time) error {
1✔
87
        // we always want to keep at least one row in the changes table,
1✔
88
        // so we take the changes table, sort by version descending, skip the first row
1✔
89
        // and then find any changes before the cutoff
1✔
90
        _, err := q.Exec(ctx, `
1✔
91
                WITH t1 AS (
1✔
92
                        SELECT *
1✔
93
                        FROM `+schemaName+`.`+recordChangesTableName+`
1✔
94
                        ORDER BY version DESC
1✔
95
                        OFFSET 1
1✔
96
                        FOR UPDATE SKIP LOCKED
1✔
97
                ), t2 AS (
1✔
98
                        SELECT version
1✔
99
                        FROM t1
1✔
100
                        WHERE modified_at<$1
1✔
101
                        FOR UPDATE SKIP LOCKED
1✔
102
                )
1✔
103
                DELETE FROM `+schemaName+`.`+recordChangesTableName+` t3
1✔
104
                USING t2
1✔
105
                WHERE t2.version=t3.version
1✔
106
        `, cutoff)
1✔
107
        return err
1✔
108
}
1✔
109

110
func deleteExpiredServices(ctx context.Context, q querier, cutoff time.Time) (rowCount int64, err error) {
16✔
111
        cmd, err := q.Exec(ctx, `
16✔
112
                WITH t1 AS (
16✔
113
                        SELECT kind, endpoint
16✔
114
                        FROM `+schemaName+`.`+servicesTableName+`
16✔
115
                        WHERE expires_at<$1
16✔
116
                        FOR UPDATE SKIP LOCKED
16✔
117
                )
16✔
118
                DELETE FROM `+schemaName+`.`+servicesTableName+` t2
16✔
119
                USING t1
16✔
120
                WHERE t1.kind=t2.kind
16✔
121
                  AND t1.endpoint=t2.endpoint
16✔
122
        `, cutoff)
16✔
123
        if err != nil {
16✔
124
                return 0, err
×
125
        }
×
126
        return cmd.RowsAffected(), nil
16✔
127
}
128

129
func enforceOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
139✔
130
        if options == nil || options.Capacity == nil {
267✔
131
                return nil
128✔
132
        }
128✔
133

134
        _, err := q.Exec(ctx, `
11✔
135
                DELETE FROM `+schemaName+`.`+recordsTableName+`
11✔
136
                WHERE type=$1
11✔
137
                  AND id NOT IN (
11✔
138
                        SELECT id
11✔
139
                        FROM `+schemaName+`.`+recordsTableName+`
11✔
140
                        WHERE type=$1
11✔
141
                        ORDER BY version DESC
11✔
142
                        LIMIT $2
11✔
143
                )
11✔
144
        `, recordType, options.GetCapacity())
11✔
145
        return err
11✔
146
}
147

148
func getRecordVersionRange(ctx context.Context, q querier) (earliestRecordVersion, latestRecordVersion uint64, err error) {
59✔
149
        err = q.QueryRow(ctx, `
59✔
150
                SELECT COALESCE(MIN(version), 0), COALESCE(MAX(version), 0)
59✔
151
                FROM `+schemaName+`.`+recordChangesTableName+`
59✔
152
        `).Scan(&earliestRecordVersion, &latestRecordVersion)
59✔
153
        return earliestRecordVersion, latestRecordVersion, err
59✔
154
}
59✔
155

156
func getCheckpoint(ctx context.Context, q querier) (serverVersion, recordVersion uint64, err error) {
1✔
157
        var sv, rv pgtype.Numeric
1✔
158
        err = q.QueryRow(ctx, `
1✔
159
                SELECT server_version, record_version
1✔
160
                FROM `+schemaName+`.`+checkpointsTableName+`
1✔
161
        `).Scan(&sv, &rv)
1✔
162
        return sv.Int.Uint64(), rv.Int.Uint64(), err
1✔
163
}
1✔
164

165
func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) {
140✔
166
        var capacity pgtype.Int8
140✔
167
        err := q.QueryRow(ctx, `
140✔
168
                SELECT capacity
140✔
169
                FROM `+schemaName+`.`+recordOptionsTableName+`
140✔
170
                WHERE type=$1
140✔
171
        `, recordType).Scan(&capacity)
140✔
172
        if err != nil && !isNotFound(err) {
140✔
173
                return nil, err
×
174
        }
×
175
        options := new(databroker.Options)
140✔
176
        if capacity.Valid {
151✔
177
                options.Capacity = proto.Uint64(uint64(capacity.Int64))
11✔
178
        }
11✔
179
        return options, nil
140✔
180
}
181

182
type lockMode string
183

184
const (
185
        lockModeNone   lockMode = ""
186
        lockModeUpdate lockMode = "FOR UPDATE"
187
)
188

189
func getRecord(
190
        ctx context.Context, q querier, recordType, recordID string, lockMode lockMode,
191
) (*databroker.Record, error) {
1,313✔
192
        var version uint64
1,313✔
193
        var data []byte
1,313✔
194
        var modifiedAt pgtype.Timestamptz
1,313✔
195
        err := q.QueryRow(ctx, `
1,313✔
196
                SELECT version, data, modified_at
1,313✔
197
                  FROM `+schemaName+`.`+recordsTableName+`
1,313✔
198
                 WHERE type=$1 AND id=$2 `+string(lockMode),
1,313✔
199
                recordType, recordID).Scan(&version, &data, &modifiedAt)
1,313✔
200
        if isNotFound(err) {
2,317✔
201
                return nil, storage.ErrNotFound
1,004✔
202
        } else if err != nil {
1,313✔
203
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
204
        }
×
205

206
        a, err := protoutil.UnmarshalAnyJSON(data)
309✔
207
        if isUnknownType(err) {
310✔
208
                return nil, storage.ErrNotFound
1✔
209
        } else if err != nil {
309✔
210
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
211
        }
×
212

213
        return &databroker.Record{
308✔
214
                Version:    version,
308✔
215
                Type:       recordType,
308✔
216
                Id:         recordID,
308✔
217
                Data:       a,
308✔
218
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
308✔
219
        }, nil
308✔
220
}
221

222
func listChangedRecordsAfter(ctx context.Context, q querier, recordType string, lastRecordVersion uint64) ([]*databroker.Record, error) {
63✔
223
        args := []any{lastRecordVersion, recordBatchSize}
63✔
224
        query := `
63✔
225
                SELECT type, id, version, data, modified_at, deleted_at
63✔
226
                FROM ` + schemaName + `.` + recordChangesTableName + `
63✔
227
                WHERE version>$1
63✔
228
        `
63✔
229
        if recordType != "" {
103✔
230
                args = append(args, recordType)
40✔
231
                query += ` AND type=$3`
40✔
232
        }
40✔
233
        query += `
63✔
234
                ORDER BY version
63✔
235
                LIMIT $2
63✔
236
        `
63✔
237
        rows, err := q.Query(ctx, query, args...)
63✔
238
        if err != nil {
63✔
239
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
240
        }
×
241
        return pgx.CollectRows(rows, collectRecord)
63✔
242
}
243

244
func listLatestRecordsAfter(ctx context.Context, q querier, expr storage.FilterExpression, lastRecordType, lastRecordID string) ([]*databroker.Record, error) {
66✔
245
        args := []any{lastRecordType, lastRecordID, recordBatchSize}
66✔
246
        query := `
66✔
247
                SELECT type, id, version, data, modified_at, NULL::timestamptz
66✔
248
                FROM ` + schemaName + `.` + recordsTableName + `
66✔
249
                WHERE ((type>$1) OR (type=$1 AND id>$2))
66✔
250
        `
66✔
251
        if expr != nil {
120✔
252
                query += "AND "
54✔
253
                err := addFilterExpressionToQuery(&query, &args, expr)
54✔
254
                if err != nil {
54✔
255
                        return nil, fmt.Errorf("postgres: failed to add filter to query: %w", err)
×
256
                }
×
257
        }
258
        query += `
66✔
259
                ORDER BY type, id
66✔
260
                LIMIT $3
66✔
261
        `
66✔
262
        rows, err := q.Query(ctx, query, args...)
66✔
263
        if err != nil {
66✔
264
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
265
        }
×
266
        return pgx.CollectRows(rows, collectRecord)
66✔
267
}
268

269
func listServices(ctx context.Context, q querier) ([]*registry.Service, error) {
2✔
270
        var services []*registry.Service
2✔
271

2✔
272
        query := `
2✔
273
                SELECT kind, endpoint
2✔
274
                FROM  ` + schemaName + `.` + servicesTableName + `
2✔
275
                ORDER BY kind, endpoint
2✔
276
        `
2✔
277
        rows, err := q.Query(ctx, query)
2✔
278
        if err != nil {
2✔
279
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
×
280
        }
×
281
        defer rows.Close()
2✔
282

2✔
283
        for rows.Next() {
5✔
284
                var kind, endpoint string
3✔
285
                err = rows.Scan(&kind, &endpoint)
3✔
286
                if err != nil {
3✔
287
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
288
                }
×
289

290
                services = append(services, &registry.Service{
3✔
291
                        Kind:     registry.ServiceKind(registry.ServiceKind_value[kind]),
3✔
292
                        Endpoint: endpoint,
3✔
293
                })
3✔
294
        }
295
        err = rows.Err()
2✔
296
        if err != nil {
2✔
297
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
298
        }
×
299

300
        return services, nil
2✔
301
}
302

303
func listTypes(ctx context.Context, q querier) ([]string, error) {
11✔
304
        query := `
11✔
305
                SELECT DISTINCT type
11✔
306
                FROM ` + schemaName + `.` + recordsTableName + `
11✔
307
        `
11✔
308
        rows, err := q.Query(ctx, query)
11✔
309
        if err != nil {
21✔
310
                return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
10✔
311
        }
10✔
312
        defer rows.Close()
1✔
313

1✔
314
        var types []string
1✔
315
        for rows.Next() {
5✔
316
                var recordType string
4✔
317
                err = rows.Scan(&recordType)
4✔
318
                if err != nil {
4✔
319
                        return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
320
                }
×
321

322
                types = append(types, recordType)
4✔
323
        }
324
        err = rows.Err()
1✔
325
        if err != nil {
1✔
326
                return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
×
327
        }
×
328

329
        sort.Strings(types)
1✔
330
        return types, nil
1✔
331
}
332

333
func maybeAcquireLease(ctx context.Context, q querier, leaseName, leaseID string, ttl time.Duration) (leaseHolderID string, err error) {
15✔
334
        tbl := schemaName + "." + leasesTableName
15✔
335
        expiresAt := timestamptzFromTimestamppb(timestamppb.New(time.Now().Add(ttl)))
15✔
336
        now := timestamptzFromTimestamppb(timestamppb.Now())
15✔
337
        err = q.QueryRow(ctx, `
15✔
338
                INSERT INTO `+tbl+` (name, id, expires_at)
15✔
339
                VALUES ($1, $2, $3)
15✔
340
                ON CONFLICT (name) DO UPDATE
15✔
341
                SET id=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $2 ELSE `+tbl+`.id END,
15✔
342
                    expires_at=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $3 ELSE `+tbl+`.expires_at END
15✔
343
                RETURNING `+tbl+`.id
15✔
344
        `, leaseName, leaseID, expiresAt, now).Scan(&leaseHolderID)
15✔
345
        return leaseHolderID, err
15✔
346
}
15✔
347

348
func putRecordAndChange(ctx context.Context, q querier, record *databroker.Record) error {
1,380✔
349
        data, err := jsonbFromAny(record.GetData())
1,380✔
350
        if err != nil {
1,380✔
351
                return fmt.Errorf("postgres: failed to convert any to json: %w", err)
×
352
        }
×
353

354
        modifiedAt := timestamptzFromTimestamppb(record.GetModifiedAt())
1,380✔
355
        deletedAt := timestamptzFromTimestamppb(record.GetDeletedAt())
1,380✔
356
        indexCIDR := &pgtype.Text{Valid: false}
1,380✔
357
        if cidr := storage.GetRecordIndexCIDR(record.GetData()); cidr != nil {
1,384✔
358
                indexCIDR.String = cidr.String()
4✔
359
                indexCIDR.Valid = true
4✔
360
        }
4✔
361

362
        query := `
1,380✔
363
                WITH t1 AS (
1,380✔
364
                        INSERT INTO ` + schemaName + `.` + recordChangesTableName + ` (type, id, data, modified_at, deleted_at)
1,380✔
365
                        VALUES ($1, $2, $3, $4, $5)
1,380✔
366
                        RETURNING *
1,380✔
367
                )
1,380✔
368
        `
1,380✔
369
        args := []any{
1,380✔
370
                record.GetType(), record.GetId(), data, modifiedAt, deletedAt,
1,380✔
371
        }
1,380✔
372
        if record.GetDeletedAt() == nil {
2,758✔
373
                query += `
1,378✔
374
                        INSERT INTO ` + schemaName + `.` + recordsTableName + ` (type, id, version, data, modified_at, index_cidr)
1,378✔
375
                        VALUES ($1, $2, (SELECT version FROM t1), $3, $4, $6)
1,378✔
376
                        ON CONFLICT (type, id) DO UPDATE
1,378✔
377
                        SET version=(SELECT version FROM t1), data=$3, modified_at=$4, index_cidr=$6
1,378✔
378
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
1,378✔
379
                `
1,378✔
380
                args = append(args, indexCIDR)
1,378✔
381
        } else {
1,380✔
382
                query += `
2✔
383
                        DELETE FROM ` + schemaName + `.` + recordsTableName + `
2✔
384
                        WHERE type=$1 AND id=$2
2✔
385
                        RETURNING ` + schemaName + `.` + recordsTableName + `.version
2✔
386
                `
2✔
387
        }
2✔
388
        err = q.QueryRow(ctx, query, args...).Scan(&record.Version)
1,380✔
389
        if err != nil && !isNotFound(err) {
2,390✔
390
                return fmt.Errorf("postgres: failed to execute query: %w", err)
1,010✔
391
        }
1,010✔
392

393
        return nil
370✔
394
}
395

396
// patchRecord updates specific fields of an existing record.
397
func patchRecord(
398
        ctx context.Context, p *pgxpool.Pool, record *databroker.Record, fields *fieldmaskpb.FieldMask,
399
) error {
204✔
400
        tx, err := p.Begin(ctx)
204✔
401
        if err != nil {
204✔
402
                return err
×
403
        }
×
404
        defer func() { _ = tx.Rollback(ctx) }()
408✔
405

406
        existing, err := getRecord(ctx, tx, record.GetType(), record.GetId(), lockModeUpdate)
204✔
407
        if isNotFound(err) {
206✔
408
                return storage.ErrNotFound
2✔
409
        } else if err != nil {
204✔
410
                return err
×
411
        }
×
412

413
        if err := storage.PatchRecord(existing, record, fields); err != nil {
202✔
414
                return err
×
415
        }
×
416

417
        if err := putRecordAndChange(ctx, tx, record); err != nil {
202✔
418
                return err
×
419
        }
×
420

421
        return tx.Commit(ctx)
202✔
422
}
423

424
func putService(ctx context.Context, q querier, svc *registry.Service, expiresAt time.Time) error {
4✔
425
        query := `
4✔
426
                INSERT INTO ` + schemaName + `.` + servicesTableName + ` (kind, endpoint, expires_at)
4✔
427
                VALUES ($1, $2, $3)
4✔
428
                ON CONFLICT (kind, endpoint) DO UPDATE
4✔
429
                SET expires_at=$3
4✔
430
        `
4✔
431
        _, err := q.Exec(ctx, query, svc.GetKind().String(), svc.GetEndpoint(), expiresAt)
4✔
432
        return err
4✔
433
}
4✔
434

435
func setCheckpoint(ctx context.Context, q querier, serverVersion, recordVersion uint64) error {
1✔
436
        var sv, rv pgtype.Numeric
1✔
437
        err := sv.Scan(strconv.FormatUint(serverVersion, 10))
1✔
438
        if err != nil {
1✔
439
                return err
×
440
        }
×
441
        err = rv.Scan(strconv.FormatUint(recordVersion, 10))
1✔
442
        if err != nil {
1✔
443
                return err
×
444
        }
×
445
        _, err = q.Exec(ctx, `
1✔
446
                UPDATE `+schemaName+`.`+checkpointsTableName+`
1✔
447
                SET server_version=$1, record_version=$2
1✔
448
        `, sv, rv)
1✔
449
        return err
1✔
450
}
451

452
func setOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error {
7✔
453
        capacity := pgtype.Int8{}
7✔
454
        if options != nil && options.Capacity != nil {
9✔
455
                capacity.Int64 = int64(options.GetCapacity())
2✔
456
                capacity.Valid = true
2✔
457
        }
2✔
458

459
        _, err := q.Exec(ctx, `
7✔
460
                INSERT INTO `+schemaName+`.`+recordOptionsTableName+` (type, capacity)
7✔
461
                VALUES ($1, $2)
7✔
462
                ON CONFLICT (type) DO UPDATE
7✔
463
                SET capacity=$2
7✔
464
        `, recordType, capacity)
7✔
465
        return err
7✔
466
}
467

468
func signalRecordChange(ctx context.Context, q querier) error {
339✔
469
        _, err := q.Exec(ctx, `NOTIFY `+recordChangeNotifyName)
339✔
470
        return err
339✔
471
}
339✔
472

473
func signalServiceChange(ctx context.Context, q querier) error {
2✔
474
        _, err := q.Exec(ctx, `NOTIFY `+serviceChangeNotifyName)
2✔
475
        return err
2✔
476
}
2✔
477

478
func jsonbFromAny(a *anypb.Any) ([]byte, error) {
1,380✔
479
        if a == nil {
2,391✔
480
                return nil, nil
1,011✔
481
        }
1,011✔
482

483
        return protojson.Marshal(a)
369✔
484
}
485

486
func timestamppbFromTimestamptz(ts pgtype.Timestamptz) *timestamppb.Timestamp {
1,150✔
487
        if !ts.Valid {
1,570✔
488
                return nil
420✔
489
        }
420✔
490
        return timestamppb.New(ts.Time)
730✔
491
}
492

493
func timestamptzFromTimestamppb(ts *timestamppb.Timestamp) pgtype.Timestamptz {
2,790✔
494
        if !ts.IsValid() {
4,168✔
495
                return pgtype.Timestamptz{}
1,378✔
496
        }
1,378✔
497
        return pgtype.Timestamptz{Time: ts.AsTime(), Valid: true}
1,412✔
498
}
499

500
func isNotFound(err error) bool {
2,654✔
501
        return errors.Is(err, pgx.ErrNoRows) || errors.Is(err, storage.ErrNotFound)
2,654✔
502
}
2,654✔
503

504
func isUnknownType(err error) bool {
730✔
505
        if err == nil {
1,457✔
506
                return false
727✔
507
        }
727✔
508

509
        return errors.Is(err, protoregistry.NotFound) ||
3✔
510
                strings.Contains(err.Error(), "unable to resolve") // protojson doesn't wrap errors so check for the string
3✔
511
}
512

513
func collectRecord(row pgx.CollectableRow) (*databroker.Record, error) {
421✔
514
        var recordType, id string
421✔
515
        var version uint64
421✔
516
        var data []byte
421✔
517
        var modifiedAt pgtype.Timestamptz
421✔
518
        var deletedAt pgtype.Timestamptz
421✔
519
        err := row.Scan(&recordType, &id, &version, &data, &modifiedAt, &deletedAt)
421✔
520
        if err != nil {
421✔
521
                return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
×
522
        }
×
523

524
        a, err := protoutil.UnmarshalAnyJSON(data)
421✔
525
        if isUnknownType(err) || len(data) == 0 {
423✔
526
                a = protoutil.ToAny(protoutil.ToStruct(map[string]string{
2✔
527
                        "id": id,
2✔
528
                }))
2✔
529
        } else if err != nil {
421✔
530
                return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
×
531
        }
×
532

533
        return &databroker.Record{
421✔
534
                Version:    version,
421✔
535
                Type:       recordType,
421✔
536
                Id:         id,
421✔
537
                Data:       a,
421✔
538
                ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
421✔
539
                DeletedAt:  timestamppbFromTimestamptz(deletedAt),
421✔
540
        }, nil
421✔
541
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc