• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 20869446095

10 Jan 2026 12:18AM UTC coverage: 53.08% (+0.07%) from 53.011%
20869446095

push

github

web-flow
core/autocert: improve test performance (#6039)

## Summary
Switch from a minio docker container to
https://github.com/johannesboyne/gofakes3 for testing s3 storage with
autocert. This allows tests to run faster.

Also implement the `TryLocker` interface so we can avoid blocking some
of the time, and support `username:password` in urls. (which allows us
to avoid setting environment variables for the test)

## Related issues
-
[ENG-3445](https://linear.app/pomerium/issue/ENG-3445/coreautocert-tests-are-slow)


## Checklist

- [x] reference any related issues
- [x] updated unit tests
- [x] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [x] ready for review

24 of 43 new or added lines in 5 files covered. (55.81%)

13 existing lines in 3 files now uncovered.

29758 of 56063 relevant lines covered (53.08%)

127.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.23
/pkg/ssh/manager.go
1
package ssh
2

3
import (
4
        "cmp"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "slices"
9
        "strconv"
10
        "sync"
11
        "time"
12

13
        corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
14
        envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
15
        discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
16
        endpointv3 "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
17
        "github.com/envoyproxy/go-control-plane/pkg/cache/types"
18
        "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
19
        "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
20
        "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3"
21
        "github.com/rs/zerolog"
22
        "golang.org/x/sync/errgroup"
23
        "google.golang.org/grpc/codes"
24
        "google.golang.org/grpc/status"
25
        "google.golang.org/protobuf/types/known/anypb"
26

27
        extensions_ssh "github.com/pomerium/envoy-custom/api/extensions/filters/network/ssh"
28
        "github.com/pomerium/pomerium/config"
29
        "github.com/pomerium/pomerium/internal/log"
30
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
31
        "github.com/pomerium/pomerium/pkg/grpc/session"
32
        "github.com/pomerium/pomerium/pkg/ssh/portforward"
33
)
34

35
type streamClusterEndpointDiscovery struct {
36
        self     *StreamManager
37
        streamID uint64
38
}
39

40
type streamEndpointsUpdate struct {
41
        streamID uint64
42
        added    map[string]portforward.RoutePortForwardInfo
43
        removed  map[string]struct{}
44
}
45

46
// UpdateClusterEndpoints implements EndpointDiscoveryInterface.
47
func (ed *streamClusterEndpointDiscovery) UpdateClusterEndpoints(added map[string]portforward.RoutePortForwardInfo, removed map[string]struct{}) {
55✔
48
        // run this callback in a separate goroutine, since it can deadlock if called
55✔
49
        // synchronously during startup
55✔
50
        ed.self.endpointsUpdateQueue <- streamEndpointsUpdate{
55✔
51
                streamID: ed.streamID,
55✔
52
                added:    added,
55✔
53
                removed:  removed,
55✔
54
        }
55✔
55
}
55✔
56

57
// PortForwardManager implements EndpointDiscoveryInterface.
58
func (ed *streamClusterEndpointDiscovery) PortForwardManager() *portforward.Manager {
6✔
59
        return ed.self.getPortForwardManagerForStream(ed.streamID)
6✔
60
}
6✔
61

62
var _ EndpointDiscoveryInterface = (*streamClusterEndpointDiscovery)(nil)
63

64
var ErrReauthDone = errors.New("reauth loop done")
65

66
type activeStream struct {
67
        Handler            *StreamHandler
68
        PortForwardManager *portforward.Manager
69
        Session            *string
70
        SessionBindingID   *string
71
        Endpoints          map[string]struct{}
72
}
73

74
type StreamManager struct {
75
        endpointv3.UnimplementedEndpointDiscoveryServiceServer
76
        ready                            chan struct{}
77
        logger                           *zerolog.Logger
78
        auth                             AuthInterface
79
        reauthC                          chan struct{}
80
        initialSessionSyncDone           bool
81
        initialSessionBindingSyncDone    bool
82
        waitForInitialSessionSync        chan struct{}
83
        waitForInitialSessionBindingSync chan struct{}
84

85
        mu sync.Mutex
86

87
        cfg           *config.Config
88
        activeStreams map[uint64]*activeStream
89

90
        // Tracks stream IDs for active sessions
91
        sessionStreams map[string]map[uint64]struct{}
92

93
        // Tracks stream IDs per sessionBindingID for active sessions
94
        bindingStreams map[string]map[uint64]struct{}
95

96
        bindingSyncer *bindingSyncer
97
        // Tracks endpoint stream IDs for clusters
98
        clusterEndpoints     map[string]map[uint64]*extensions_ssh.EndpointMetadata
99
        endpointsUpdateQueue chan streamEndpointsUpdate
100
        edsCache             *cache.LinearCache
101
        edsServer            delta.Server
102
        indexer              PolicyIndexer
103
        cliCtrl              InternalCLIController
104
}
105

106
func (sm *StreamManager) getPortForwardManagerForStream(streamID uint64) *portforward.Manager {
6✔
107
        sm.mu.Lock()
6✔
108
        defer sm.mu.Unlock()
6✔
109
        activeStream := sm.activeStreams[streamID]
6✔
110
        if activeStream == nil || activeStream.PortForwardManager == nil {
6✔
111
                return nil
×
112
        }
×
113
        return activeStream.PortForwardManager // may be nil
6✔
114
}
115

116
// OnDeltaStreamClosed implements delta.Callbacks.
117
func (sm *StreamManager) OnDeltaStreamClosed(int64, *corev3.Node) {
×
118
}
×
119

120
// OnDeltaStreamOpen implements delta.Callbacks.
121
func (sm *StreamManager) OnDeltaStreamOpen(context.Context, int64, string) error {
×
122
        return nil
×
123
}
×
124

125
// OnStreamDeltaRequest implements delta.Callbacks.
126
func (sm *StreamManager) OnStreamDeltaRequest(_ int64, req *discoveryv3.DeltaDiscoveryRequest) error {
×
127
        if len(req.ResourceNamesSubscribe) == 0 {
×
128
                return nil
×
129
        }
×
130
        sm.mu.Lock()
×
131
        defer sm.mu.Unlock()
×
132
        initialEmptyResources := make(map[string]types.Resource)
×
133
        for _, clusterID := range req.ResourceNamesSubscribe {
×
134
                if _, ok := sm.clusterEndpoints[clusterID]; !ok {
×
135
                        sm.clusterEndpoints[clusterID] = map[uint64]*extensions_ssh.EndpointMetadata{}
×
136
                        initialEmptyResources[clusterID] = &envoy_config_endpoint_v3.ClusterLoadAssignment{
×
137
                                ClusterName: clusterID,
×
138
                        }
×
139
                }
×
140
        }
141
        return sm.edsCache.UpdateResources(initialEmptyResources, nil)
×
142
}
143

144
// OnStreamDeltaResponse implements delta.Callbacks.
145
func (sm *StreamManager) OnStreamDeltaResponse(int64, *discoveryv3.DeltaDiscoveryRequest, *discoveryv3.DeltaDiscoveryResponse) {
×
146
}
×
147

148
// DeltaEndpoints implements endpointv3.EndpointDiscoveryServiceServer.
149
func (sm *StreamManager) DeltaEndpoints(stream endpointv3.EndpointDiscoveryService_DeltaEndpointsServer) error {
×
150
        select {
×
151
        case <-stream.Context().Done():
×
152
                return context.Cause(stream.Context())
×
153
        case <-sm.ready:
×
154
        }
155
        log.Ctx(stream.Context()).Debug().Msg("delta endpoint stream started")
×
156
        defer log.Ctx(stream.Context()).Debug().Msg("delta endpoint stream ended")
×
157
        return sm.edsServer.DeltaStreamHandler(stream, resource.EndpointType)
×
158
}
159

160
// ClearRecords implements databroker.SyncerHandler.
161
func (sm *StreamManager) ClearRecords(ctx context.Context) {
147✔
162
        sm.mu.Lock()
147✔
163
        defer sm.mu.Unlock()
147✔
164
        if !sm.initialSessionSyncDone {
293✔
165
                sm.initialSessionSyncDone = true
146✔
166
                close(sm.waitForInitialSessionSync)
146✔
167
                log.Ctx(ctx).Debug().
146✔
168
                        Msg("ssh stream manager: initial sync done")
146✔
169
                return
146✔
170
        }
146✔
171
        for sessionID, streamIDs := range sm.sessionStreams {
3✔
172
                for streamID := range streamIDs {
4✔
173
                        log.Ctx(ctx).Debug().
2✔
174
                                Str("session-id", sessionID).
2✔
175
                                Uint64("stream-id", streamID).
2✔
176
                                Msg("terminating stream: databroker sync reset")
2✔
177
                        sm.terminateStreamLocked(streamID)
2✔
178
                }
2✔
179
        }
180
        clear(sm.sessionStreams)
1✔
181
}
182

183
func (sm *StreamManager) clearRecordsBinding(ctx context.Context) {
40✔
184
        sm.mu.Lock()
40✔
185
        defer sm.mu.Unlock()
40✔
186
        if !sm.initialSessionBindingSyncDone {
80✔
187
                sm.initialSessionBindingSyncDone = true
40✔
188
                close(sm.waitForInitialSessionBindingSync)
40✔
189
                log.Ctx(ctx).Debug().
40✔
190
                        Msg("ssh stream manager: initial sync done")
40✔
191
                return
40✔
192
        }
40✔
UNCOV
193
        for sessionID, streamIDs := range sm.bindingStreams {
×
UNCOV
194
                for streamID := range streamIDs {
×
UNCOV
195
                        log.Ctx(ctx).Debug().
×
UNCOV
196
                                Str("session-binding-id", sessionID).
×
UNCOV
197
                                Uint64("stream-id", streamID).
×
UNCOV
198
                                Msg("terminating stream: databroker sync reset")
×
UNCOV
199
                        sm.terminateStreamLocked(streamID)
×
UNCOV
200
                }
×
201
        }
UNCOV
202
        clear(sm.sessionStreams)
×
203
}
204

205
// GetDataBrokerServiceClient implements databroker.SyncerHandler.
206
func (sm *StreamManager) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
200✔
207
        return sm.auth.GetDataBrokerServiceClient()
200✔
208
}
200✔
209

210
// UpdateRecords implements databroker.SyncerHandler.
211
func (sm *StreamManager) UpdateRecords(ctx context.Context, _ uint64, records []*databroker.Record) {
85✔
212
        sm.mu.Lock()
85✔
213
        defer sm.mu.Unlock()
85✔
214
        for _, record := range records {
129✔
215
                if record.DeletedAt == nil {
86✔
216
                        // New session
42✔
217
                        var s session.Session
42✔
218
                        if err := record.Data.UnmarshalTo(&s); err != nil {
42✔
219
                                log.Ctx(ctx).Err(err).Msg("invalid session object, ignoring")
×
220
                                continue
×
221
                        }
222
                        s.Version = strconv.FormatUint(record.GetVersion(), 10)
42✔
223
                        sm.indexer.OnSessionCreated(&s)
42✔
224
                        continue
42✔
225
                }
226
                // Session was deleted; terminate all of its associated streams
227
                sm.indexer.OnSessionDeleted(record.Id)
2✔
228
                for streamID := range sm.sessionStreams[record.Id] {
5✔
229
                        log.Ctx(ctx).Debug().
3✔
230
                                Str("session-id", record.Id).
3✔
231
                                Uint64("stream-id", streamID).
3✔
232
                                Msg("terminating stream: session revoked")
3✔
233
                        sm.terminateStreamLocked(streamID)
3✔
234
                }
3✔
235
                delete(sm.sessionStreams, record.Id)
2✔
236
        }
237
}
238

239
func (sm *StreamManager) updateRecordsBinding(ctx context.Context, _ uint64, records []*databroker.Record) {
86✔
240
        sm.mu.Lock()
86✔
241
        defer sm.mu.Unlock()
86✔
242
        for _, record := range records {
132✔
243
                if record.DeletedAt == nil {
86✔
244
                        continue
40✔
245
                }
246
                // Session binding was deleted; terminate all of its associated streams
247
                for streamID := range sm.bindingStreams[record.Id] {
12✔
248
                        log.Ctx(ctx).Debug().
6✔
249
                                Str("session-id", record.Id).
6✔
250
                                Uint64("stream-id", streamID).
6✔
251
                                Msg("terminating stream: session binding revoked")
6✔
252
                        sm.terminateStreamLocked(streamID)
6✔
253
                }
6✔
254
                delete(sm.bindingStreams, record.Id)
6✔
255
        }
256
}
257

258
func (sm *StreamManager) OnStreamAuthenticated(ctx context.Context, streamID uint64, req AuthRequest) error {
45✔
259
        if err := sm.waitForInitialSync(ctx); err != nil {
45✔
260
                return err
×
261
        }
×
262
        sm.mu.Lock()
45✔
263
        defer sm.mu.Unlock()
45✔
264
        activeStream := sm.activeStreams[streamID]
45✔
265
        if activeStream.Session != nil || activeStream.SessionBindingID != nil {
45✔
266
                return status.Errorf(codes.Internal, "stream %d already has an associated session", streamID)
×
267
        }
×
268

269
        if sm.sessionStreams[req.SessionID] == nil {
89✔
270
                sm.sessionStreams[req.SessionID] = map[uint64]struct{}{}
44✔
271
        }
44✔
272
        if sm.bindingStreams[req.SessionBindingID] == nil {
87✔
273
                sm.bindingStreams[req.SessionBindingID] = map[uint64]struct{}{}
42✔
274
        }
42✔
275
        sm.sessionStreams[req.SessionID][streamID] = struct{}{}
45✔
276
        sm.bindingStreams[req.SessionBindingID][streamID] = struct{}{}
45✔
277

45✔
278
        activeStream.Session = new(string)
45✔
279
        *activeStream.Session = req.SessionID
45✔
280
        activeStream.SessionBindingID = new(string)
45✔
281
        *activeStream.SessionBindingID = req.SessionBindingID
45✔
282

45✔
283
        activeStream.PortForwardManager.AddUpdateListener(activeStream.Handler)
45✔
284

45✔
285
        sm.indexer.OnStreamAuthenticated(streamID, req)
45✔
286
        return nil
45✔
287
}
288

289
type bindingSyncer struct {
290
        clientHandler func() databroker.DataBrokerServiceClient
291
        clearHandler  func(context.Context)
292
        updateHandler func(context.Context, uint64, []*databroker.Record)
293
}
294

295
var _ databroker.SyncerHandler = (*bindingSyncer)(nil)
296

297
func (sbr *bindingSyncer) ClearRecords(ctx context.Context) {
40✔
298
        sbr.clearHandler(ctx)
40✔
299
}
40✔
300

301
// GetDataBrokerServiceClient implements databroker.SyncerHandler.
302
func (sbr *bindingSyncer) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
103✔
303
        return sbr.clientHandler()
103✔
304
}
103✔
305

306
// UpdateRecords implements databroker.SyncerHandler.
307
func (sbr *bindingSyncer) UpdateRecords(ctx context.Context, serverVersion uint64, records []*databroker.Record) {
86✔
308
        sbr.updateHandler(ctx, serverVersion, records)
86✔
309
}
86✔
310

311
func NewStreamManager(ctx context.Context, auth AuthInterface, indexer PolicyIndexer, cliCtrl InternalCLIController, cfg *config.Config) *StreamManager {
146✔
312
        sm := &StreamManager{
146✔
313
                logger:                           log.Ctx(ctx),
146✔
314
                auth:                             auth,
146✔
315
                ready:                            make(chan struct{}),
146✔
316
                waitForInitialSessionSync:        make(chan struct{}),
146✔
317
                waitForInitialSessionBindingSync: make(chan struct{}),
146✔
318
                reauthC:                          make(chan struct{}, 1),
146✔
319
                cfg:                              cfg,
146✔
320
                activeStreams:                    map[uint64]*activeStream{},
146✔
321
                sessionStreams:                   map[string]map[uint64]struct{}{},
146✔
322
                clusterEndpoints:                 map[string]map[uint64]*extensions_ssh.EndpointMetadata{},
146✔
323
                edsCache:                         cache.NewLinearCache(resource.EndpointType),
146✔
324
                endpointsUpdateQueue:             make(chan streamEndpointsUpdate, 128),
146✔
325
                bindingStreams:                   map[string]map[uint64]struct{}{},
146✔
326
                indexer:                          indexer,
146✔
327
                cliCtrl:                          cliCtrl,
146✔
328
        }
146✔
329

146✔
330
        bindingSyncer := &bindingSyncer{
146✔
331
                clientHandler: sm.GetDataBrokerServiceClient,
146✔
332
                clearHandler:  sm.clearRecordsBinding,
146✔
333
                updateHandler: sm.updateRecordsBinding,
146✔
334
        }
146✔
335

146✔
336
        sm.bindingSyncer = bindingSyncer
146✔
337

146✔
338
        sm.indexer.ProcessConfigUpdate(cfg)
146✔
339
        return sm
146✔
340
}
146✔
341

342
func (sm *StreamManager) waitForInitialSync(ctx context.Context) error {
48✔
343
        sm.mu.Lock()
48✔
344
        for !sm.initialSessionSyncDone || !sm.initialSessionBindingSyncDone {
56✔
345
                sm.mu.Unlock()
8✔
346
                select {
8✔
347
                case <-sm.waitForInitialSessionSync:
7✔
348
                case <-sm.waitForInitialSessionBindingSync:
1✔
349
                case <-time.After(10 * time.Second):
×
350
                        return errors.New("timed out waiting for initial sync")
×
351
                case <-ctx.Done():
×
352
                        return context.Cause(ctx)
×
353
                }
354
                sm.mu.Lock()
8✔
355
        }
356
        sm.mu.Unlock()
48✔
357
        return nil
48✔
358
}
359

360
func (sm *StreamManager) Run(ctx context.Context) error {
40✔
361
        sm.edsServer = delta.NewServer(ctx, sm.edsCache, sm)
40✔
362
        eg, eCtx := errgroup.WithContext(ctx)
40✔
363
        eg.Go(func() error {
80✔
364
                syncer := databroker.NewSyncer(
40✔
365
                        eCtx,
40✔
366
                        "ssh-auth-session-sync",
40✔
367
                        sm,
40✔
368
                        databroker.WithTypeURL("type.googleapis.com/session.Session"))
40✔
369
                return syncer.Run(eCtx)
40✔
370
        })
40✔
371

372
        eg.Go(func() error {
80✔
373
                syncer := databroker.NewSyncer(
40✔
374
                        eCtx,
40✔
375
                        "ssh-auth-session-binding-sync",
40✔
376
                        sm.bindingSyncer,
40✔
377
                        databroker.WithTypeURL("type.googleapis.com/session.SessionBinding"),
40✔
378
                )
40✔
379
                return syncer.Run(eCtx)
40✔
380
        })
40✔
381

382
        eg.Go(func() error {
80✔
383
                sm.reauthLoop(eCtx)
40✔
384
                return ErrReauthDone
40✔
385
        })
40✔
386
        eg.Go(func() error {
80✔
387
                sm.endpointsUpdateLoop(ctx)
40✔
388
                return nil
40✔
389
        })
40✔
390

391
        close(sm.ready)
40✔
392
        err := eg.Wait()
40✔
393
        if errors.Is(err, ErrReauthDone) {
64✔
394
                return nil
24✔
395
        }
24✔
396

397
        return err
16✔
398
}
399

400
func (sm *StreamManager) OnConfigChange(cfg *config.Config) {
45✔
401
        sm.indexer.ProcessConfigUpdate(cfg)
45✔
402

45✔
403
        // TODO: integrate the re-auth mechanism with the indexer
45✔
404
        select {
45✔
405
        case sm.reauthC <- struct{}{}:
45✔
406
        default:
×
407
        }
408
}
409

410
func (sm *StreamManager) LookupStream(streamID uint64) *StreamHandler {
21✔
411
        sm.mu.Lock()
21✔
412
        defer sm.mu.Unlock()
21✔
413
        if info, ok := sm.activeStreams[streamID]; ok {
40✔
414
                return info.Handler
19✔
415
        }
19✔
416
        return nil
2✔
417
}
418

419
func (sm *StreamManager) NewStreamHandler(
420
        _ context.Context,
421
        downstream *extensions_ssh.DownstreamConnectEvent,
422
) *StreamHandler {
152✔
423
        sm.mu.Lock()
152✔
424
        defer sm.mu.Unlock()
152✔
425
        streamID := downstream.StreamId
152✔
426

152✔
427
        onClose := func() {
291✔
428
                sm.onStreamHandlerClosed(streamID)
139✔
429
        }
139✔
430
        discovery := &streamClusterEndpointDiscovery{
152✔
431
                self:     sm,
152✔
432
                streamID: streamID,
152✔
433
        }
152✔
434
        sh := NewStreamHandler(sm.auth, discovery, sm.cliCtrl, sm.cfg, downstream, onClose)
152✔
435
        portForwardMgr := portforward.NewManager()
152✔
436
        sm.activeStreams[streamID] = &activeStream{
152✔
437
                Handler:            sh,
152✔
438
                Endpoints:          map[string]struct{}{},
152✔
439
                PortForwardManager: portForwardMgr,
152✔
440
        }
152✔
441
        sm.indexer.AddStream(streamID, portForwardMgr)
152✔
442
        return sh
152✔
443
}
444

445
func (sm *StreamManager) onStreamHandlerClosed(streamID uint64) {
139✔
446
        sm.mu.Lock()
139✔
447
        defer sm.mu.Unlock()
139✔
448
        info := sm.activeStreams[streamID]
139✔
449
        delete(sm.activeStreams, streamID)
139✔
450

139✔
451
        info.PortForwardManager.RemoveUpdateListener(info.Handler)
139✔
452
        sm.indexer.RemoveStream(streamID)
139✔
453

139✔
454
        if info.Session != nil {
179✔
455
                session := *info.Session
40✔
456
                delete(sm.sessionStreams[session], streamID)
40✔
457
                if len(sm.sessionStreams[session]) == 0 {
80✔
458
                        delete(sm.sessionStreams, session)
40✔
459
                }
40✔
460
        }
461
        if info.SessionBindingID != nil {
179✔
462
                bindingID := *info.SessionBindingID
40✔
463
                delete(sm.bindingStreams[bindingID], streamID)
40✔
464
                if len(sm.bindingStreams[bindingID]) == 0 {
80✔
465
                        delete(sm.bindingStreams, bindingID)
40✔
466
                }
40✔
467
        }
468

469
        if len(info.Endpoints) > 0 {
141✔
470
                sm.logger.Debug().
2✔
471
                        Uint64("stream-id", streamID).
2✔
472
                        Any("endpoints", info.Endpoints).
2✔
473
                        Msg("clearing endpoints for closed stream")
2✔
474
                sm.endpointsUpdateQueue <- streamEndpointsUpdate{
2✔
475
                        streamID: streamID,
2✔
476
                        removed:  info.Endpoints,
2✔
477
                }
2✔
478
        }
2✔
479
}
480

481
func (sm *StreamManager) processStreamEndpointsUpdate(update streamEndpointsUpdate) {
57✔
482
        sm.mu.Lock()
57✔
483
        defer sm.mu.Unlock()
57✔
484
        streamID := update.streamID
57✔
485

57✔
486
        activeStream := sm.activeStreams[streamID] // can be nil
57✔
487

57✔
488
        toUpdate := map[string]types.Resource{} // *envoy_config_endpoint_v3.LbEndpoint
57✔
489
        var toDelete []string
57✔
490
        for clusterID, info := range update.added {
63✔
491
                if activeStream != nil {
12✔
492
                        activeStream.Endpoints[clusterID] = struct{}{}
6✔
493
                }
6✔
494
                if _, ok := sm.clusterEndpoints[clusterID]; !ok {
10✔
495
                        sm.clusterEndpoints[clusterID] = map[uint64]*extensions_ssh.EndpointMetadata{}
4✔
496
                }
4✔
497
                sm.clusterEndpoints[clusterID][streamID] = buildEndpointMetadata(info)
6✔
498
                toUpdate[clusterID] = buildClusterLoadAssignment(clusterID, sm.clusterEndpoints[clusterID])
6✔
499
        }
500
        for clusterID := range update.removed {
63✔
501
                if activeStream != nil {
10✔
502
                        delete(activeStream.Endpoints, clusterID)
4✔
503
                }
4✔
504
                delete(sm.clusterEndpoints[clusterID], streamID)
6✔
505
                if len(sm.clusterEndpoints[clusterID]) == 0 {
10✔
506
                        // No more endpoints for this cluster, so delete the resource. The cluster
4✔
507
                        // will handle this by clearing the endpoints.
4✔
508
                        toDelete = append(toDelete, clusterID)
4✔
509
                        delete(sm.clusterEndpoints, clusterID)
4✔
510
                } else {
6✔
511
                        // There are still endpoints
2✔
512
                        toUpdate[clusterID] = buildClusterLoadAssignment(clusterID, sm.clusterEndpoints[clusterID])
2✔
513
                }
2✔
514
        }
515

516
        if err := sm.edsCache.UpdateResources(toUpdate, toDelete); err != nil {
57✔
517
                sm.logger.Err(err).Msg("error updating EDS resources")
×
518
        }
×
519
}
520

521
func buildClusterLoadAssignment(clusterID string, clusterEndpoints map[uint64]*extensions_ssh.EndpointMetadata) types.Resource {
8✔
522
        endpoints := []*envoy_config_endpoint_v3.LbEndpoint{}
8✔
523
        for streamID, metadata := range clusterEndpoints {
18✔
524
                endpoints = append(endpoints, buildLbEndpoint(streamID, metadata))
10✔
525
        }
10✔
526
        slices.SortFunc(endpoints, compareEndpoints)
8✔
527
        return &envoy_config_endpoint_v3.ClusterLoadAssignment{
8✔
528
                ClusterName: clusterID,
8✔
529
                Endpoints:   []*envoy_config_endpoint_v3.LocalityLbEndpoints{{LbEndpoints: endpoints}},
8✔
530
        }
8✔
531
}
532

533
func compareEndpoints(a, b *envoy_config_endpoint_v3.LbEndpoint) int {
2✔
534
        return cmp.Compare(
2✔
535
                a.GetEndpoint().GetAddress().GetSocketAddress().GetAddress(),
2✔
536
                b.GetEndpoint().GetAddress().GetSocketAddress().GetAddress())
2✔
537
}
2✔
538

539
func buildEndpointMetadata(info portforward.RoutePortForwardInfo) *extensions_ssh.EndpointMetadata {
6✔
540
        serverPort := info.Permission.ServerPort()
6✔
541
        return &extensions_ssh.EndpointMetadata{
6✔
542
                ServerPort: &extensions_ssh.ServerPort{
6✔
543
                        Value:     serverPort.Value,
6✔
544
                        IsDynamic: serverPort.IsDynamic,
6✔
545
                },
6✔
546
                MatchedPermission: &extensions_ssh.PortForwardPermission{
6✔
547
                        RequestedHost: info.Permission.HostMatcher.InputPattern(),
6✔
548
                        RequestedPort: info.Permission.RequestedPort,
6✔
549
                },
6✔
550
        }
6✔
551
}
6✔
552

553
func buildLbEndpoint(streamID uint64, metadata *extensions_ssh.EndpointMetadata) *envoy_config_endpoint_v3.LbEndpoint {
10✔
554
        endpointMdAny, _ := anypb.New(metadata)
10✔
555
        return &envoy_config_endpoint_v3.LbEndpoint{
10✔
556
                HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{
10✔
557
                        Endpoint: &envoy_config_endpoint_v3.Endpoint{
10✔
558
                                Address: &corev3.Address{
10✔
559
                                        Address: &corev3.Address_SocketAddress{
10✔
560
                                                SocketAddress: &corev3.SocketAddress{
10✔
561
                                                        Address: fmt.Sprintf("ssh:%d", streamID),
10✔
562
                                                        PortSpecifier: &corev3.SocketAddress_PortValue{
10✔
563
                                                                PortValue: metadata.ServerPort.Value,
10✔
564
                                                        },
10✔
565
                                                },
10✔
566
                                        },
10✔
567
                                },
10✔
568
                        },
10✔
569
                },
10✔
570
                Metadata: &corev3.Metadata{
10✔
571
                        TypedFilterMetadata: map[string]*anypb.Any{
10✔
572
                                "com.pomerium.ssh.endpoint": endpointMdAny,
10✔
573
                        },
10✔
574
                },
10✔
575
                HealthStatus: corev3.HealthStatus_HEALTHY,
10✔
576
        }
10✔
577
}
10✔
578

579
func (sm *StreamManager) reauthLoop(ctx context.Context) {
40✔
580
        for {
125✔
581
                select {
85✔
582
                case <-ctx.Done():
40✔
583
                        return
40✔
584
                case <-sm.reauthC:
45✔
585
                        sm.mu.Lock()
45✔
586
                        snapshot := make([]*activeStream, 0, len(sm.activeStreams))
45✔
587
                        for _, s := range sm.activeStreams {
55✔
588
                                snapshot = append(snapshot, s)
10✔
589
                        }
10✔
590
                        sm.mu.Unlock()
45✔
591

45✔
592
                        for _, s := range snapshot {
55✔
593
                                s.Handler.Reauth()
10✔
594
                        }
10✔
595
                }
596
        }
597
}
598

599
func (sm *StreamManager) endpointsUpdateLoop(ctx context.Context) {
40✔
600
        for {
137✔
601
                select {
97✔
602
                case <-ctx.Done():
40✔
603
                        return
40✔
604
                case update := <-sm.endpointsUpdateQueue:
57✔
605
                        sm.processStreamEndpointsUpdate(update)
57✔
606
                }
607
        }
608
}
609

610
func (sm *StreamManager) terminateStreamLocked(streamID uint64) {
11✔
611
        if sh, ok := sm.activeStreams[streamID]; ok {
22✔
612
                sh.Handler.Terminate(status.Errorf(codes.PermissionDenied, "no longer authorized"))
11✔
613
        }
11✔
614
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc