• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 18690754649

21 Oct 2025 04:27PM UTC coverage: 53.953% (+0.02%) from 53.929%
18690754649

push

github

web-flow
endpoints: add paths (#5888)

## Summary
Add additional paths to the `endpoints` package.


## Checklist

- [ ] reference any related issues
- [x] updated unit tests
- [x] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [x] ready for review

60 of 76 new or added lines in 22 files covered. (78.95%)

8 existing lines in 5 files now uncovered.

27424 of 50829 relevant lines covered (53.95%)

86.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.67
/pkg/storage/postgres/backend.go
1
package postgres
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "time"
9

10
        "github.com/cenkalti/backoff/v4"
11
        "github.com/exaring/otelpgx"
12
        "github.com/jackc/pgx/v5/pgxpool"
13
        "google.golang.org/protobuf/types/known/fieldmaskpb"
14
        "google.golang.org/protobuf/types/known/timestamppb"
15

16
        "github.com/pomerium/pomerium/internal/log"
17
        "github.com/pomerium/pomerium/internal/signal"
18
        "github.com/pomerium/pomerium/pkg/contextutil"
19
        "github.com/pomerium/pomerium/pkg/cryptutil"
20
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
21
        "github.com/pomerium/pomerium/pkg/health"
22
        "github.com/pomerium/pomerium/pkg/storage"
23
)
24

25
// Backend is a storage Backend implemented with Postgres.
26
type Backend struct {
27
        cfg              *config
28
        dsn              string
29
        onRecordChange   *signal.Signal
30
        onServiceChange  *signal.Signal
31
        iteratorCanceler contextutil.Canceler
32

33
        closeCtx context.Context
34
        close    context.CancelFunc
35

36
        mu            sync.RWMutex
37
        pool          *pgxpool.Pool
38
        serverVersion uint64
39
}
40

41
// New creates a new Backend.
42
func New(ctx context.Context, dsn string, options ...Option) *Backend {
6✔
43
        backend := &Backend{
6✔
44
                cfg:              getConfig(options...),
6✔
45
                dsn:              dsn,
6✔
46
                onRecordChange:   signal.New(),
6✔
47
                onServiceChange:  signal.New(),
6✔
48
                iteratorCanceler: contextutil.NewCanceler(),
6✔
49
        }
6✔
50
        backend.closeCtx, backend.close = context.WithCancel(ctx)
6✔
51

6✔
52
        go backend.doOnceAndPeriodically(func(ctx context.Context) error {
19✔
53
                _, pool, err := backend.init(ctx)
13✔
54
                if errors.Is(err, context.Canceled) {
14✔
55
                        return nil
1✔
56
                }
1✔
57
                if err != nil {
12✔
58
                        return err
×
59
                }
×
60
                rowCount, err := deleteExpiredServices(ctx, pool, time.Now())
12✔
61
                if err != nil {
13✔
62
                        return err
1✔
63
                }
1✔
64
                if rowCount > 0 {
11✔
65
                        err = signalServiceChange(ctx, pool)
×
66
                        if err != nil {
×
67
                                return err
×
68
                        }
×
69
                }
70
                if err != nil {
11✔
71
                        health.ReportError(health.StorageBackendCleanup, err, backend.healthAttrs()...)
×
72
                } else {
11✔
73
                        health.ReportRunning(health.StorageBackendCleanup, backend.healthAttrs()...)
11✔
74
                }
11✔
75
                return err
11✔
76
        }, backend.cfg.registryTTL/2)
77

78
        go backend.doOnceAndPeriodically(func(ctx context.Context) error {
18✔
79
                err := backend.listenForNotifications(ctx)
12✔
80
                if errors.Is(err, context.Canceled) {
24✔
81
                        return nil
12✔
82
                }
12✔
83
                return err
×
84
        }, time.Millisecond*100)
85

86
        go backend.doOnceAndPeriodically(func(ctx context.Context) error {
18✔
87
                err := backend.ping(ctx)
12✔
88
                // ignore canceled errors
12✔
89
                if errors.Is(err, context.Canceled) {
12✔
UNCOV
90
                        return nil
×
UNCOV
91
                }
×
92

93
                if err != nil {
12✔
94
                        health.ReportError(health.StorageBackend, err, backend.healthAttrs()...)
×
95
                } else {
12✔
96
                        health.ReportRunning(health.StorageBackend, backend.healthAttrs()...)
12✔
97
                }
12✔
98
                return nil
12✔
99
        }, time.Minute)
100

101
        return backend
6✔
102
}
103

104
// Close closes the underlying database connection.
105
func (backend *Backend) Close() error {
6✔
106
        backend.mu.Lock()
6✔
107
        defer backend.mu.Unlock()
6✔
108

6✔
109
        backend.close()
6✔
110

6✔
111
        if backend.pool != nil {
11✔
112
                backend.pool.Close()
5✔
113
                backend.pool = nil
5✔
114
        }
5✔
115
        return nil
6✔
116
}
117

118
// Clean removes all changes before the given cutoff.
119
func (backend *Backend) Clean(ctx context.Context, options storage.CleanOptions) error {
1✔
120
        _, pool, err := backend.init(ctx)
1✔
121
        if err != nil {
1✔
122
                return err
×
123
        }
×
124

125
        return deleteChangesBefore(ctx, pool, options.RemoveRecordChangesBefore)
1✔
126
}
127

128
// Clear removes all records from the storage backend.
129
func (backend *Backend) Clear(ctx context.Context) error {
1✔
130
        _, pool, err := backend.init(ctx)
1✔
131
        if err != nil {
1✔
132
                return err
×
133
        }
×
134

135
        backend.mu.Lock()
1✔
136
        defer backend.mu.Unlock()
1✔
137

1✔
138
        newServerVersion := uint64(cryptutil.NewRandomUInt32())
1✔
139
        err = clearRecords(ctx, pool, newServerVersion)
1✔
140
        if err != nil {
1✔
141
                return err
×
142
        }
×
143
        backend.serverVersion = newServerVersion
1✔
144
        backend.iteratorCanceler.Cancel(nil)
1✔
145
        return nil
1✔
146
}
147

148
// Get gets a record from the database.
149
func (backend *Backend) Get(
150
        ctx context.Context,
151
        recordType, recordID string,
152
) (*databroker.Record, error) {
1,109✔
153
        ctx, cancel := contextutil.Merge(ctx, backend.closeCtx)
1,109✔
154
        defer cancel(nil)
1,109✔
155

1,109✔
156
        _, conn, err := backend.init(ctx)
1,109✔
157
        if err != nil {
1,109✔
158
                return nil, err
×
159
        }
×
160

161
        return getRecord(ctx, conn, recordType, recordID, lockModeNone)
1,109✔
162
}
163

164
// GetCheckpoint gets the latest checkpoint.
165
func (backend *Backend) GetCheckpoint(
166
        ctx context.Context,
167
) (serverVersion, recordVersion uint64, err error) {
1✔
168
        _, pool, err := backend.init(ctx)
1✔
169
        if err != nil {
1✔
170
                return 0, 0, err
×
171
        }
×
172
        return getCheckpoint(ctx, pool)
1✔
173
}
174

175
// GetOptions returns the options for the given record type.
176
func (backend *Backend) GetOptions(
177
        ctx context.Context,
178
        recordType string,
179
) (*databroker.Options, error) {
1✔
180
        ctx, cancel := contextutil.Merge(ctx, backend.closeCtx)
1✔
181
        defer cancel(nil)
1✔
182

1✔
183
        _, conn, err := backend.init(ctx)
1✔
184
        if err != nil {
1✔
185
                return nil, err
×
186
        }
×
187

188
        return getOptions(ctx, conn, recordType)
1✔
189
}
190

191
// Lease attempts to acquire a lease for the given name.
192
func (backend *Backend) Lease(
193
        ctx context.Context,
194
        leaseName, leaseID string,
195
        ttl time.Duration,
196
) (acquired bool, err error) {
16✔
197
        ctx, cancel := contextutil.Merge(ctx, backend.closeCtx)
16✔
198
        defer cancel(nil)
16✔
199

16✔
200
        _, conn, err := backend.init(ctx)
16✔
201
        if err != nil {
16✔
202
                return false, err
×
203
        }
×
204

205
        leaseHolderID, err := maybeAcquireLease(ctx, conn, leaseName, leaseID, ttl)
16✔
206
        if err != nil {
16✔
207
                return false, err
×
208
        }
×
209

210
        return leaseHolderID == leaseID, nil
16✔
211
}
212

213
// ListTypes lists the record types.
214
func (backend *Backend) ListTypes(ctx context.Context) ([]string, error) {
11✔
215
        ctx, cancel := contextutil.Merge(ctx, backend.closeCtx)
11✔
216
        defer cancel(nil)
11✔
217

11✔
218
        _, conn, err := backend.init(ctx)
11✔
219
        if err != nil {
11✔
220
                return nil, err
×
221
        }
×
222

223
        return listTypes(ctx, conn)
11✔
224
}
225

226
// Put puts a record into Postgres.
227
func (backend *Backend) Put(
228
        ctx context.Context,
229
        records []*databroker.Record,
230
) (serverVersion uint64, err error) {
1,141✔
231
        ctx, cancel := contextutil.Merge(ctx, backend.closeCtx)
1,141✔
232
        defer cancel(nil)
1,141✔
233

1,141✔
234
        serverVersion, pool, err := backend.init(ctx)
1,141✔
235
        if err != nil {
1,141✔
236
                return 0, err
×
237
        }
×
238

239
        now := timestamppb.Now()
1,141✔
240

1,141✔
241
        // add all the records
1,141✔
242
        recordTypes := map[string]struct{}{}
1,141✔
243
        for i, record := range records {
2,291✔
244
                recordTypes[record.GetType()] = struct{}{}
1,150✔
245

1,150✔
246
                record = dup(record)
1,150✔
247
                record.ModifiedAt = now
1,150✔
248
                err := putRecordAndChange(ctx, pool, record)
1,150✔
249
                if err != nil {
2,160✔
250
                        return serverVersion, fmt.Errorf("storage/postgres: error saving record: %w", err)
1,010✔
251
                }
1,010✔
252
                records[i] = record
140✔
253
        }
254

255
        // enforce options for each record type
256
        for recordType := range recordTypes {
262✔
257
                options, err := getOptions(ctx, pool, recordType)
131✔
258
                if err != nil {
131✔
259
                        return serverVersion, fmt.Errorf("storage/postgres: error getting options: %w", err)
×
260
                }
×
261
                err = enforceOptions(ctx, pool, recordType, options)
131✔
262
                if err != nil {
131✔
263
                        return serverVersion, fmt.Errorf("storage/postgres: error enforcing options: %w", err)
×
264
                }
×
265
        }
266

267
        err = signalRecordChange(ctx, pool)
131✔
268
        return serverVersion, err
131✔
269
}
270

271
// Patch updates specific fields of existing records in Postgres.
272
func (backend *Backend) Patch(
273
        ctx context.Context,
274
        records []*databroker.Record,
275
        fields *fieldmaskpb.FieldMask,
276
) (uint64, []*databroker.Record, error) {
202✔
277
        ctx, cancel := contextutil.Merge(ctx, backend.closeCtx)
202✔
278
        defer cancel(nil)
202✔
279

202✔
280
        serverVersion, pool, err := backend.init(ctx)
202✔
281
        if err != nil {
202✔
282
                return serverVersion, nil, err
×
283
        }
×
284

285
        patchedRecords := make([]*databroker.Record, 0, len(records))
202✔
286

202✔
287
        now := timestamppb.Now()
202✔
288

202✔
289
        for _, record := range records {
406✔
290
                record = dup(record)
204✔
291
                record.ModifiedAt = now
204✔
292
                err := patchRecord(ctx, pool, record, fields)
204✔
293
                if storage.IsNotFound(err) {
206✔
294
                        continue
2✔
295
                } else if err != nil {
202✔
296
                        err = fmt.Errorf("storage/postgres: error patching record %q of type %q: %w",
×
297
                                record.GetId(), record.GetType(), err)
×
298
                        return serverVersion, patchedRecords, err
×
299
                }
×
300
                patchedRecords = append(patchedRecords, record)
202✔
301
        }
302

303
        err = signalRecordChange(ctx, pool)
202✔
304
        return serverVersion, patchedRecords, err
202✔
305
}
306

307
// SetCheckpoint sets the latest checkpoint.
308
func (backend *Backend) SetCheckpoint(
309
        ctx context.Context,
310
        serverVersion, recordVersion uint64,
311
) error {
1✔
312
        _, pool, err := backend.init(ctx)
1✔
313
        if err != nil {
1✔
314
                return err
×
315
        }
×
316
        return setCheckpoint(ctx, pool, serverVersion, recordVersion)
1✔
317
}
318

319
// SetOptions sets the options for the given record type.
320
func (backend *Backend) SetOptions(
321
        ctx context.Context,
322
        recordType string,
323
        options *databroker.Options,
324
) error {
2✔
325
        ctx, cancel := contextutil.Merge(ctx, backend.closeCtx)
2✔
326
        defer cancel(nil)
2✔
327

2✔
328
        _, conn, err := backend.init(ctx)
2✔
329
        if err != nil {
2✔
330
                return err
×
331
        }
×
332

333
        return setOptions(ctx, conn, recordType, options)
2✔
334
}
335

336
// Sync syncs the records.
337
func (backend *Backend) Sync(
338
        ctx context.Context,
339
        recordType string,
340
        serverVersion, recordVersion uint64,
341
        wait bool,
342
) storage.RecordIterator {
17✔
343
        return backend.iterateChangedRecords(ctx, recordType, serverVersion, recordVersion, wait)
17✔
344
}
17✔
345

346
// SyncLatest syncs the latest version of each record.
347
func (backend *Backend) SyncLatest(
348
        ctx context.Context,
349
        recordType string,
350
        expr storage.FilterExpression,
351
) (serverVersion, recordVersion uint64, seq storage.RecordIterator, err error) {
20✔
352
        // the original ctx will be used for the stream, this ctx used for pre-stream calls
20✔
353
        callCtx, cancel := contextutil.Merge(ctx, backend.closeCtx)
20✔
354
        defer cancel(nil)
20✔
355

20✔
356
        serverVersion, pool, err := backend.init(callCtx)
20✔
357
        if err != nil {
20✔
358
                return 0, 0, nil, err
×
359
        }
×
360

361
        _, recordVersion, err = getRecordVersionRange(callCtx, pool)
20✔
362
        if err != nil {
20✔
363
                return 0, 0, nil, err
×
364
        }
×
365

366
        if recordType != "" {
35✔
367
                f := storage.EqualsFilterExpression{
15✔
368
                        Fields: []string{"type"},
15✔
369
                        Value:  recordType,
15✔
370
                }
15✔
371
                if expr != nil {
17✔
372
                        expr = storage.AndFilterExpression{expr, f}
2✔
373
                } else {
15✔
374
                        expr = f
13✔
375
                }
13✔
376
        }
377
        return serverVersion, recordVersion, backend.iterateLatestRecords(ctx, expr), nil
20✔
378
}
379

380
// Versions returns the versions of the storage backend.
381
func (backend *Backend) Versions(ctx context.Context) (serverVersion, earliestRecordVersion, latestRecordVersion uint64, err error) {
3✔
382
        serverVersion, pool, err := backend.init(ctx)
3✔
383
        if err != nil {
3✔
384
                return 0, 0, 0, err
×
385
        }
×
386

387
        earliestRecordVersion, latestRecordVersion, err = getRecordVersionRange(ctx, pool)
3✔
388
        if err != nil {
3✔
389
                return 0, 0, 0, err
×
390
        }
×
391

392
        return serverVersion, earliestRecordVersion, latestRecordVersion, nil
3✔
393
}
394

395
func (backend *Backend) init(ctx context.Context) (serverVersion uint64, pool *pgxpool.Pool, err error) {
2,588✔
396
        backend.mu.RLock()
2,588✔
397
        serverVersion = backend.serverVersion
2,588✔
398
        pool = backend.pool
2,588✔
399
        backend.mu.RUnlock()
2,588✔
400

2,588✔
401
        if pool != nil {
5,162✔
402
                return serverVersion, pool, nil
2,574✔
403
        }
2,574✔
404

405
        backend.mu.Lock()
14✔
406
        defer backend.mu.Unlock()
14✔
407

14✔
408
        // double-checked locking, might have already initialized, so just return
14✔
409
        serverVersion = backend.serverVersion
14✔
410
        pool = backend.pool
14✔
411
        if pool != nil {
15✔
412
                return serverVersion, pool, nil
1✔
413
        }
1✔
414

415
        config, err := pgxpool.ParseConfig(backend.dsn)
13✔
416
        if err != nil {
13✔
417
                return serverVersion, nil, err
×
418
        }
×
419

420
        if backend.cfg.tracerProvider != nil {
14✔
421
                config.ConnConfig.Tracer = otelpgx.NewTracer(
1✔
422
                        otelpgx.WithTracerProvider(backend.cfg.tracerProvider))
1✔
423
        }
1✔
424

425
        pool, err = pgxpool.NewWithConfig(context.Background(), config)
13✔
426
        if err != nil {
13✔
427
                return serverVersion, nil, fmt.Errorf("error creating pgxpool: %w", err)
×
428
        }
×
429

430
        err = otelpgx.RecordStats(pool)
13✔
431
        if err != nil {
13✔
432
                return serverVersion, nil, fmt.Errorf("error recording stats: %w", err)
×
433
        }
×
434

435
        tx, err := pool.Begin(ctx)
13✔
436
        if err != nil {
20✔
437
                return serverVersion, nil, fmt.Errorf("error starting transaction: %w", err)
7✔
438
        }
7✔
439

440
        serverVersion, err = migrate(ctx, tx)
6✔
441
        if err != nil {
6✔
442
                _ = tx.Rollback(ctx)
×
443
                return serverVersion, nil, fmt.Errorf("error running migrations: %w", err)
×
444
        }
×
445

446
        err = tx.Commit(ctx)
6✔
447
        if err != nil {
6✔
448
                _ = tx.Rollback(ctx)
×
449
                return serverVersion, nil, fmt.Errorf("error committing transaction: %w", err)
×
450
        }
×
451

452
        backend.serverVersion = serverVersion
6✔
453
        backend.pool = pool
6✔
454
        return serverVersion, pool, nil
6✔
455
}
456

457
func (backend *Backend) doOnceAndPeriodically(f func(ctx context.Context) error, dur time.Duration) {
18✔
458
        ctx := backend.closeCtx
18✔
459
        if err := f(ctx); err != nil {
19✔
460
                log.Ctx(ctx).Error().Err(err).Msg("storage/postgres")
1✔
461
        }
1✔
462
        backend.doPeriodically(f, dur)
18✔
463
}
464

465
func (backend *Backend) doPeriodically(f func(ctx context.Context) error, dur time.Duration) {
18✔
466
        ctx := backend.closeCtx
18✔
467

18✔
468
        ticker := time.NewTicker(dur)
18✔
469
        defer ticker.Stop()
18✔
470

18✔
471
        bo := backoff.NewExponentialBackOff()
18✔
472
        bo.MaxElapsedTime = 0
18✔
473

18✔
474
        for {
37✔
475
                err := f(ctx)
19✔
476
                if err == nil {
38✔
477
                        bo.Reset()
19✔
478
                        select {
19✔
479
                        case <-backend.closeCtx.Done():
18✔
480
                                return
18✔
481
                        case <-ticker.C:
1✔
482
                        }
483
                } else {
×
484
                        if !errors.Is(err, context.Canceled) {
×
485
                                log.Ctx(ctx).Error().Err(err).Msg("storage/postgres")
×
486
                        }
×
487
                        select {
×
488
                        case <-backend.closeCtx.Done():
×
489
                                return
×
490
                        case <-time.After(bo.NextBackOff()):
×
491
                        }
492
                }
493
        }
494
}
495

496
func (backend *Backend) listenForNotifications(ctx context.Context) error {
12✔
497
        _, pool, err := backend.init(ctx)
12✔
498
        if err != nil {
17✔
499
                return fmt.Errorf("error initializing pool for notifications: %w", err)
5✔
500
        }
5✔
501

502
        poolConn, err := pool.Acquire(ctx)
7✔
503
        if err != nil {
9✔
504
                return fmt.Errorf("error acquiring connection from pool for notifications: %w", err)
2✔
505
        }
2✔
506

507
        // hijack the connection so the pool can be left for short-lived queries
508
        // and so that LISTENs don't leak to other queries
509
        conn := poolConn.Hijack()
5✔
510
        defer conn.Close(ctx)
5✔
511

5✔
512
        for _, ch := range []string{recordChangeNotifyName, serviceChangeNotifyName} {
15✔
513
                _, err = conn.Exec(ctx, `LISTEN `+ch)
10✔
514
                if err != nil {
10✔
515
                        return fmt.Errorf("error listening on channel %s for notifications: %w", ch, err)
×
516
                }
×
517
        }
518

519
        // for each notification broadcast the signal
520
        for {
340✔
521
                n, err := conn.WaitForNotification(ctx)
335✔
522
                if err != nil {
340✔
523
                        // on error we'll close the connection to stop listening
5✔
524
                        return fmt.Errorf("error receiving notification: %w", err)
5✔
525
                }
5✔
526

527
                switch n.Channel {
330✔
528
                case recordChangeNotifyName:
329✔
529
                        backend.onRecordChange.Broadcast(ctx)
329✔
530
                case serviceChangeNotifyName:
1✔
531
                        backend.onServiceChange.Broadcast(ctx)
1✔
532
                }
533
        }
534
}
535

536
func (backend *Backend) ping(ctx context.Context) error {
12✔
537
        _, pool, err := backend.init(ctx)
12✔
538
        if err != nil {
12✔
UNCOV
539
                return err
×
UNCOV
540
        }
×
541

542
        return pool.Ping(ctx)
12✔
543
}
544

545
func (backend *Backend) healthAttrs() []health.Attr {
23✔
546
        return []health.Attr{{Key: "backend", Value: "postgres"}}
23✔
547
}
23✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc