• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 20083459477

10 Dec 2025 12:50AM UTC coverage: 52.855% (-0.02%) from 52.874%
20083459477

push

github

web-flow
ssh: refactor in-memory policy indexer (#5979)

This refactors some of the internals of the in-memory policy indexer to
properly account for multiple streams authorized to the same session,
but with different auth requests. Within a single session, upstream
tunnel authorized routes are evaluated and cached separately for each
unique AuthRequest within that session. If multiple streams connect and
have the same parameters in the AuthRequest, the cached routes will be
shared as expected.

There is a limit to how many different sets of authorized routes will be
cached (currently 5). If more streams with unique AuthRequests connect,
the oldest cached entries will be evicted.

205 of 371 new or added lines in 3 files covered. (55.26%)

5 existing lines in 4 files now uncovered.

29394 of 55613 relevant lines covered (52.85%)

127.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.01
/pkg/ssh/manager.go
1
package ssh
2

3
import (
4
        "cmp"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "slices"
9
        "strconv"
10
        "sync"
11
        "time"
12

13
        corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
14
        envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
15
        discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
16
        endpointv3 "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
17
        "github.com/envoyproxy/go-control-plane/pkg/cache/types"
18
        "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
19
        "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
20
        "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3"
21
        "github.com/rs/zerolog"
22
        "golang.org/x/sync/errgroup"
23
        "google.golang.org/grpc/codes"
24
        "google.golang.org/grpc/status"
25
        "google.golang.org/protobuf/types/known/anypb"
26

27
        extensions_ssh "github.com/pomerium/envoy-custom/api/extensions/filters/network/ssh"
28
        "github.com/pomerium/pomerium/config"
29
        "github.com/pomerium/pomerium/internal/log"
30
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
31
        "github.com/pomerium/pomerium/pkg/grpc/session"
32
        "github.com/pomerium/pomerium/pkg/ssh/portforward"
33
)
34

35
type streamClusterEndpointDiscovery struct {
36
        self     *StreamManager
37
        streamID uint64
38
}
39

40
type streamEndpointsUpdate struct {
41
        streamID uint64
42
        added    map[string]portforward.RoutePortForwardInfo
43
        removed  map[string]struct{}
44
}
45

46
// UpdateClusterEndpoints implements EndpointDiscoveryInterface.
47
func (ed *streamClusterEndpointDiscovery) UpdateClusterEndpoints(added map[string]portforward.RoutePortForwardInfo, removed map[string]struct{}) {
54✔
48
        // run this callback in a separate goroutine, since it can deadlock if called
54✔
49
        // synchronously during startup
54✔
50
        ed.self.endpointsUpdateQueue <- streamEndpointsUpdate{
54✔
51
                streamID: ed.streamID,
54✔
52
                added:    added,
54✔
53
                removed:  removed,
54✔
54
        }
54✔
55
}
54✔
56

57
// PortForwardManager implements EndpointDiscoveryInterface.
58
func (ed *streamClusterEndpointDiscovery) PortForwardManager() *portforward.Manager {
6✔
59
        return ed.self.getPortForwardManagerForStream(ed.streamID)
6✔
60
}
6✔
61

62
var _ EndpointDiscoveryInterface = (*streamClusterEndpointDiscovery)(nil)
63

64
var ErrReauthDone = errors.New("reauth loop done")
65

66
type activeStream struct {
67
        Handler            *StreamHandler
68
        PortForwardManager *portforward.Manager
69
        Session            *string
70
        SessionBindingID   *string
71
        Endpoints          map[string]struct{}
72
}
73

74
type StreamManager struct {
75
        endpointv3.UnimplementedEndpointDiscoveryServiceServer
76
        ready                            chan struct{}
77
        logger                           *zerolog.Logger
78
        auth                             AuthInterface
79
        reauthC                          chan struct{}
80
        initialSessionSyncDone           bool
81
        initialSessionBindingSyncDone    bool
82
        waitForInitialSessionSync        chan struct{}
83
        waitForInitialSessionBindingSync chan struct{}
84

85
        mu sync.Mutex
86

87
        cfg           *config.Config
88
        activeStreams map[uint64]*activeStream
89

90
        // Tracks stream IDs for active sessions
91
        sessionStreams map[string]map[uint64]struct{}
92

93
        // Tracks stream IDs per sessionBindingID for active sessions
94
        bindingStreams map[string]map[uint64]struct{}
95

96
        bindingSyncer *bindingSyncer
97
        // Tracks endpoint stream IDs for clusters
98
        clusterEndpoints     map[string]map[uint64]*extensions_ssh.EndpointMetadata
99
        endpointsUpdateQueue chan streamEndpointsUpdate
100
        edsCache             *cache.LinearCache
101
        edsServer            delta.Server
102
        indexer              PolicyIndexer
103
}
104

105
func (sm *StreamManager) getPortForwardManagerForStream(streamID uint64) *portforward.Manager {
6✔
106
        sm.mu.Lock()
6✔
107
        defer sm.mu.Unlock()
6✔
108
        activeStream := sm.activeStreams[streamID]
6✔
109
        if activeStream == nil || activeStream.PortForwardManager == nil {
6✔
110
                return nil
×
111
        }
×
112
        return activeStream.PortForwardManager // may be nil
6✔
113
}
114

115
// OnDeltaStreamClosed implements delta.Callbacks.
116
func (sm *StreamManager) OnDeltaStreamClosed(int64, *corev3.Node) {
×
117
}
×
118

119
// OnDeltaStreamOpen implements delta.Callbacks.
120
func (sm *StreamManager) OnDeltaStreamOpen(context.Context, int64, string) error {
×
121
        return nil
×
122
}
×
123

124
// OnStreamDeltaRequest implements delta.Callbacks.
125
func (sm *StreamManager) OnStreamDeltaRequest(_ int64, req *discoveryv3.DeltaDiscoveryRequest) error {
×
126
        if len(req.ResourceNamesSubscribe) == 0 {
×
127
                return nil
×
128
        }
×
129
        initialEmptyResources := make(map[string]types.Resource)
×
130
        for _, clusterID := range req.ResourceNamesSubscribe {
×
131
                initialEmptyResources[clusterID] = &envoy_config_endpoint_v3.ClusterLoadAssignment{
×
132
                        ClusterName: clusterID,
×
133
                }
×
134
        }
×
135
        return sm.edsCache.UpdateResources(initialEmptyResources, nil)
×
136
}
137

138
// OnStreamDeltaResponse implements delta.Callbacks.
139
func (sm *StreamManager) OnStreamDeltaResponse(int64, *discoveryv3.DeltaDiscoveryRequest, *discoveryv3.DeltaDiscoveryResponse) {
×
140
}
×
141

142
// DeltaEndpoints implements endpointv3.EndpointDiscoveryServiceServer.
143
func (sm *StreamManager) DeltaEndpoints(stream endpointv3.EndpointDiscoveryService_DeltaEndpointsServer) error {
×
144
        select {
×
145
        case <-stream.Context().Done():
×
146
                return context.Cause(stream.Context())
×
147
        case <-sm.ready:
×
148
        }
149
        log.Ctx(stream.Context()).Debug().Msg("delta endpoint stream started")
×
150
        defer log.Ctx(stream.Context()).Debug().Msg("delta endpoint stream ended")
×
151
        return sm.edsServer.DeltaStreamHandler(stream, resource.EndpointType)
×
152
}
153

154
// ClearRecords implements databroker.SyncerHandler.
155
func (sm *StreamManager) ClearRecords(ctx context.Context) {
145✔
156
        sm.mu.Lock()
145✔
157
        defer sm.mu.Unlock()
145✔
158
        if !sm.initialSessionSyncDone {
290✔
159
                sm.initialSessionSyncDone = true
145✔
160
                close(sm.waitForInitialSessionSync)
145✔
161
                log.Ctx(ctx).Debug().
145✔
162
                        Msg("ssh stream manager: initial sync done")
145✔
163
                return
145✔
164
        }
145✔
UNCOV
165
        for sessionID, streamIDs := range sm.sessionStreams {
×
166
                for streamID := range streamIDs {
×
167
                        log.Ctx(ctx).Debug().
×
168
                                Str("session-id", sessionID).
×
169
                                Uint64("stream-id", streamID).
×
170
                                Msg("terminating stream: databroker sync reset")
×
171
                        sm.terminateStreamLocked(streamID)
×
172
                }
×
173
        }
UNCOV
174
        clear(sm.sessionStreams)
×
175
}
176

177
func (sm *StreamManager) clearRecordsBinding(ctx context.Context) {
40✔
178
        sm.mu.Lock()
40✔
179
        defer sm.mu.Unlock()
40✔
180
        if !sm.initialSessionBindingSyncDone {
79✔
181
                sm.initialSessionBindingSyncDone = true
39✔
182
                close(sm.waitForInitialSessionBindingSync)
39✔
183
                log.Ctx(ctx).Debug().
39✔
184
                        Msg("ssh stream manager: initial sync done")
39✔
185
                return
39✔
186
        }
39✔
187
        for sessionID, streamIDs := range sm.bindingStreams {
3✔
188
                for streamID := range streamIDs {
6✔
189
                        log.Ctx(ctx).Debug().
4✔
190
                                Str("session-binding-id", sessionID).
4✔
191
                                Uint64("stream-id", streamID).
4✔
192
                                Msg("terminating stream: databroker sync reset")
4✔
193
                        sm.terminateStreamLocked(streamID)
4✔
194
                }
4✔
195
        }
196
        clear(sm.sessionStreams)
1✔
197
}
198

199
// GetDataBrokerServiceClient implements databroker.SyncerHandler.
200
func (sm *StreamManager) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
192✔
201
        return sm.auth.GetDataBrokerServiceClient()
192✔
202
}
192✔
203

204
// UpdateRecords implements databroker.SyncerHandler.
205
func (sm *StreamManager) UpdateRecords(ctx context.Context, _ uint64, records []*databroker.Record) {
86✔
206
        sm.mu.Lock()
86✔
207
        defer sm.mu.Unlock()
86✔
208
        for _, record := range records {
133✔
209
                if record.DeletedAt == nil {
88✔
210
                        // New session
41✔
211
                        var s session.Session
41✔
212
                        if err := record.Data.UnmarshalTo(&s); err != nil {
41✔
213
                                log.Ctx(ctx).Err(err).Msg("invalid session object, ignoring")
×
214
                                continue
×
215
                        }
216
                        s.Version = strconv.FormatUint(record.GetVersion(), 10)
41✔
217
                        sm.indexer.OnSessionCreated(&s)
41✔
218
                        continue
41✔
219
                }
220
                // Session was deleted; terminate all of its associated streams
221
                sm.indexer.OnSessionDeleted(record.Id)
6✔
222
                for streamID := range sm.sessionStreams[record.Id] {
9✔
223
                        log.Ctx(ctx).Debug().
3✔
224
                                Str("session-id", record.Id).
3✔
225
                                Uint64("stream-id", streamID).
3✔
226
                                Msg("terminating stream: session revoked")
3✔
227
                        sm.terminateStreamLocked(streamID)
3✔
228
                }
3✔
229
                delete(sm.sessionStreams, record.Id)
6✔
230
        }
231
}
232

233
func (sm *StreamManager) updateRecordsBinding(ctx context.Context, _ uint64, records []*databroker.Record) {
85✔
234
        sm.mu.Lock()
85✔
235
        defer sm.mu.Unlock()
85✔
236
        for _, record := range records {
130✔
237
                if record.DeletedAt == nil {
84✔
238
                        continue
39✔
239
                }
240
                // Session binding was deleted; terminate all of its associated streams
241
                for streamID := range sm.bindingStreams[record.Id] {
12✔
242
                        log.Ctx(ctx).Debug().
6✔
243
                                Str("session-id", record.Id).
6✔
244
                                Uint64("stream-id", streamID).
6✔
245
                                Msg("terminating stream: session binding revoked")
6✔
246
                        sm.terminateStreamLocked(streamID)
6✔
247
                }
6✔
248
                delete(sm.bindingStreams, record.Id)
6✔
249
        }
250
}
251

252
func (sm *StreamManager) OnStreamAuthenticated(ctx context.Context, streamID uint64, req AuthRequest) error {
44✔
253
        if err := sm.waitForInitialSync(ctx); err != nil {
44✔
254
                return err
×
255
        }
×
256
        sm.mu.Lock()
44✔
257
        defer sm.mu.Unlock()
44✔
258
        activeStream := sm.activeStreams[streamID]
44✔
259
        if activeStream.Session != nil || activeStream.SessionBindingID != nil {
44✔
260
                return status.Errorf(codes.Internal, "stream %d already has an associated session", streamID)
×
261
        }
×
262

263
        if sm.sessionStreams[req.SessionID] == nil {
87✔
264
                sm.sessionStreams[req.SessionID] = map[uint64]struct{}{}
43✔
265
        }
43✔
266
        if sm.bindingStreams[req.SessionBindingID] == nil {
85✔
267
                sm.bindingStreams[req.SessionBindingID] = map[uint64]struct{}{}
41✔
268
        }
41✔
269
        sm.sessionStreams[req.SessionID][streamID] = struct{}{}
44✔
270
        sm.bindingStreams[req.SessionBindingID][streamID] = struct{}{}
44✔
271

44✔
272
        activeStream.Session = new(string)
44✔
273
        *activeStream.Session = req.SessionID
44✔
274
        activeStream.SessionBindingID = new(string)
44✔
275
        *activeStream.SessionBindingID = req.SessionBindingID
44✔
276

44✔
277
        activeStream.PortForwardManager.AddUpdateListener(activeStream.Handler)
44✔
278

44✔
279
        sm.indexer.OnStreamAuthenticated(streamID, req)
44✔
280
        return nil
44✔
281
}
282

283
type bindingSyncer struct {
284
        clientHandler func() databroker.DataBrokerServiceClient
285
        clearHandler  func(context.Context)
286
        updateHandler func(context.Context, uint64, []*databroker.Record)
287
}
288

289
var _ databroker.SyncerHandler = (*bindingSyncer)(nil)
290

291
func (sbr *bindingSyncer) ClearRecords(ctx context.Context) {
40✔
292
        sbr.clearHandler(ctx)
40✔
293
}
40✔
294

295
// GetDataBrokerServiceClient implements databroker.SyncerHandler.
296
func (sbr *bindingSyncer) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
102✔
297
        return sbr.clientHandler()
102✔
298
}
102✔
299

300
// UpdateRecords implements databroker.SyncerHandler.
301
func (sbr *bindingSyncer) UpdateRecords(ctx context.Context, serverVersion uint64, records []*databroker.Record) {
85✔
302
        sbr.updateHandler(ctx, serverVersion, records)
85✔
303
}
85✔
304

305
func NewStreamManager(ctx context.Context, auth AuthInterface, indexer PolicyIndexer, cfg *config.Config) *StreamManager {
145✔
306
        sm := &StreamManager{
145✔
307
                logger:                           log.Ctx(ctx),
145✔
308
                auth:                             auth,
145✔
309
                ready:                            make(chan struct{}),
145✔
310
                waitForInitialSessionSync:        make(chan struct{}),
145✔
311
                waitForInitialSessionBindingSync: make(chan struct{}),
145✔
312
                reauthC:                          make(chan struct{}, 1),
145✔
313
                cfg:                              cfg,
145✔
314
                activeStreams:                    map[uint64]*activeStream{},
145✔
315
                sessionStreams:                   map[string]map[uint64]struct{}{},
145✔
316
                clusterEndpoints:                 map[string]map[uint64]*extensions_ssh.EndpointMetadata{},
145✔
317
                edsCache:                         cache.NewLinearCache(resource.EndpointType),
145✔
318
                endpointsUpdateQueue:             make(chan streamEndpointsUpdate, 128),
145✔
319
                bindingStreams:                   map[string]map[uint64]struct{}{},
145✔
320
                indexer:                          indexer,
145✔
321
        }
145✔
322

145✔
323
        bindingSyncer := &bindingSyncer{
145✔
324
                clientHandler: sm.GetDataBrokerServiceClient,
145✔
325
                clearHandler:  sm.clearRecordsBinding,
145✔
326
                updateHandler: sm.updateRecordsBinding,
145✔
327
        }
145✔
328

145✔
329
        sm.bindingSyncer = bindingSyncer
145✔
330

145✔
331
        sm.indexer.ProcessConfigUpdate(cfg)
145✔
332
        return sm
145✔
333
}
145✔
334

335
func (sm *StreamManager) waitForInitialSync(ctx context.Context) error {
47✔
336
        sm.mu.Lock()
47✔
337
        for !sm.initialSessionSyncDone || !sm.initialSessionBindingSyncDone {
685✔
338
                sm.mu.Unlock()
638✔
339
                select {
638✔
340
                case <-sm.waitForInitialSessionSync:
637✔
341
                case <-sm.waitForInitialSessionBindingSync:
1✔
342
                case <-time.After(10 * time.Second):
×
343
                        return errors.New("timed out waiting for initial sync")
×
344
                case <-ctx.Done():
×
345
                        return context.Cause(ctx)
×
346
                }
347
                sm.mu.Lock()
638✔
348
        }
349
        sm.mu.Unlock()
47✔
350
        return nil
47✔
351
}
352

353
func (sm *StreamManager) Run(ctx context.Context) error {
39✔
354
        sm.edsServer = delta.NewServer(ctx, sm.edsCache, sm)
39✔
355
        eg, eCtx := errgroup.WithContext(ctx)
39✔
356
        eg.Go(func() error {
78✔
357
                syncer := databroker.NewSyncer(
39✔
358
                        eCtx,
39✔
359
                        "ssh-auth-session-sync",
39✔
360
                        sm,
39✔
361
                        databroker.WithTypeURL("type.googleapis.com/session.Session"))
39✔
362
                return syncer.Run(eCtx)
39✔
363
        })
39✔
364

365
        eg.Go(func() error {
78✔
366
                syncer := databroker.NewSyncer(
39✔
367
                        eCtx,
39✔
368
                        "ssh-auth-session-binding-sync",
39✔
369
                        sm.bindingSyncer,
39✔
370
                        databroker.WithTypeURL("type.googleapis.com/session.SessionBinding"),
39✔
371
                )
39✔
372
                return syncer.Run(eCtx)
39✔
373
        })
39✔
374

375
        eg.Go(func() error {
78✔
376
                sm.reauthLoop(eCtx)
39✔
377
                return ErrReauthDone
39✔
378
        })
39✔
379
        eg.Go(func() error {
78✔
380
                sm.endpointsUpdateLoop(ctx)
39✔
381
                return nil
39✔
382
        })
39✔
383

384
        close(sm.ready)
39✔
385
        err := eg.Wait()
39✔
386
        if errors.Is(err, ErrReauthDone) {
60✔
387
                return nil
21✔
388
        }
21✔
389

390
        return err
18✔
391
}
392

393
func (sm *StreamManager) OnConfigChange(cfg *config.Config) {
44✔
394
        sm.indexer.ProcessConfigUpdate(cfg)
44✔
395

44✔
396
        // TODO: integrate the re-auth mechanism with the indexer
44✔
397
        select {
44✔
398
        case sm.reauthC <- struct{}{}:
44✔
399
        default:
×
400
        }
401
}
402

403
func (sm *StreamManager) LookupStream(streamID uint64) *StreamHandler {
21✔
404
        sm.mu.Lock()
21✔
405
        defer sm.mu.Unlock()
21✔
406
        if info, ok := sm.activeStreams[streamID]; ok {
40✔
407
                return info.Handler
19✔
408
        }
19✔
409
        return nil
2✔
410
}
411

412
func (sm *StreamManager) NewStreamHandler(
413
        _ context.Context,
414
        downstream *extensions_ssh.DownstreamConnectEvent,
415
) *StreamHandler {
151✔
416
        sm.mu.Lock()
151✔
417
        defer sm.mu.Unlock()
151✔
418
        streamID := downstream.StreamId
151✔
419

151✔
420
        onClose := func() {
289✔
421
                sm.onStreamHandlerClosed(streamID)
138✔
422
        }
138✔
423
        discovery := &streamClusterEndpointDiscovery{
151✔
424
                self:     sm,
151✔
425
                streamID: streamID,
151✔
426
        }
151✔
427
        sh := NewStreamHandler(sm.auth, discovery, sm.cfg, downstream, onClose)
151✔
428
        portForwardMgr := portforward.NewManager()
151✔
429
        sm.activeStreams[streamID] = &activeStream{
151✔
430
                Handler:            sh,
151✔
431
                Endpoints:          map[string]struct{}{},
151✔
432
                PortForwardManager: portForwardMgr,
151✔
433
        }
151✔
434
        sm.indexer.AddStream(streamID, portForwardMgr)
151✔
435
        return sh
151✔
436
}
437

438
func (sm *StreamManager) onStreamHandlerClosed(streamID uint64) {
138✔
439
        sm.mu.Lock()
138✔
440
        defer sm.mu.Unlock()
138✔
441
        info := sm.activeStreams[streamID]
138✔
442
        delete(sm.activeStreams, streamID)
138✔
443

138✔
444
        info.PortForwardManager.RemoveUpdateListener(info.Handler)
138✔
445
        sm.indexer.RemoveStream(streamID)
138✔
446

138✔
447
        if info.Session != nil {
177✔
448
                session := *info.Session
39✔
449
                delete(sm.sessionStreams[session], streamID)
39✔
450
                if len(sm.sessionStreams[session]) == 0 {
78✔
451
                        delete(sm.sessionStreams, session)
39✔
452
                }
39✔
453
        }
454
        if info.SessionBindingID != nil {
177✔
455
                bindingID := *info.SessionBindingID
39✔
456
                delete(sm.bindingStreams[bindingID], streamID)
39✔
457
                if len(sm.bindingStreams[bindingID]) == 0 {
78✔
458
                        delete(sm.bindingStreams, bindingID)
39✔
459
                }
39✔
460
        }
461

462
        if len(info.Endpoints) > 0 {
140✔
463
                sm.logger.Debug().
2✔
464
                        Uint64("stream-id", streamID).
2✔
465
                        Any("endpoints", info.Endpoints).
2✔
466
                        Msg("clearing endpoints for closed stream")
2✔
467
                sm.endpointsUpdateQueue <- streamEndpointsUpdate{
2✔
468
                        streamID: streamID,
2✔
469
                        removed:  info.Endpoints,
2✔
470
                }
2✔
471
        }
2✔
472
}
473

474
func (sm *StreamManager) processStreamEndpointsUpdate(update streamEndpointsUpdate) {
56✔
475
        // TODO: this may not scale well
56✔
476
        sm.mu.Lock()
56✔
477
        defer sm.mu.Unlock()
56✔
478
        streamID := update.streamID
56✔
479

56✔
480
        activeStream := sm.activeStreams[streamID] // can be nil
56✔
481

56✔
482
        toUpdate := map[string]types.Resource{} // *envoy_config_endpoint_v3.LbEndpoint
56✔
483
        toDelete := []string{}
56✔
484
        for clusterID, info := range update.added {
62✔
485
                if activeStream != nil {
12✔
486
                        activeStream.Endpoints[clusterID] = struct{}{}
6✔
487
                }
6✔
488
                if _, ok := sm.clusterEndpoints[clusterID]; !ok {
10✔
489
                        sm.clusterEndpoints[clusterID] = map[uint64]*extensions_ssh.EndpointMetadata{}
4✔
490
                }
4✔
491
                metadata := buildEndpointMetadata(info)
6✔
492
                sm.clusterEndpoints[clusterID][streamID] = metadata
6✔
493
                toUpdate[clusterID] = buildClusterLoadAssignment(clusterID, sm.clusterEndpoints[clusterID])
6✔
494
        }
495

496
        for clusterID := range update.removed {
62✔
497
                if activeStream != nil {
10✔
498
                        delete(activeStream.Endpoints, clusterID)
4✔
499
                }
4✔
500
                delete(sm.clusterEndpoints[clusterID], streamID)
6✔
501
                if len(sm.clusterEndpoints[clusterID]) == 0 {
10✔
502
                        delete(sm.clusterEndpoints, clusterID)
4✔
503
                        toDelete = append(toDelete, clusterID)
4✔
504
                } else {
6✔
505
                        toUpdate[clusterID] = buildClusterLoadAssignment(clusterID, sm.clusterEndpoints[clusterID])
2✔
506
                }
2✔
507
        }
508

509
        if err := sm.edsCache.UpdateResources(toUpdate, toDelete); err != nil {
56✔
510
                sm.logger.Err(err).Msg("error updating EDS resources")
×
511
        }
×
512
}
513

514
func buildClusterLoadAssignment(clusterID string, clusterEndpoints map[uint64]*extensions_ssh.EndpointMetadata) types.Resource {
8✔
515
        endpoints := []*envoy_config_endpoint_v3.LbEndpoint{}
8✔
516
        for streamID, metadata := range clusterEndpoints {
18✔
517
                endpoints = append(endpoints, buildLbEndpoint(streamID, metadata))
10✔
518
        }
10✔
519
        slices.SortFunc(endpoints, compareEndpoints)
8✔
520
        return &envoy_config_endpoint_v3.ClusterLoadAssignment{
8✔
521
                ClusterName: clusterID,
8✔
522
                Endpoints:   []*envoy_config_endpoint_v3.LocalityLbEndpoints{{LbEndpoints: endpoints}},
8✔
523
        }
8✔
524
}
525

526
func compareEndpoints(a, b *envoy_config_endpoint_v3.LbEndpoint) int {
2✔
527
        return cmp.Compare(
2✔
528
                a.GetEndpoint().GetAddress().GetSocketAddress().GetAddress(),
2✔
529
                b.GetEndpoint().GetAddress().GetSocketAddress().GetAddress())
2✔
530
}
2✔
531

532
func buildEndpointMetadata(info portforward.RoutePortForwardInfo) *extensions_ssh.EndpointMetadata {
6✔
533
        serverPort := info.Permission.ServerPort()
6✔
534
        return &extensions_ssh.EndpointMetadata{
6✔
535
                ServerPort: &extensions_ssh.ServerPort{
6✔
536
                        Value:     serverPort.Value,
6✔
537
                        IsDynamic: serverPort.IsDynamic,
6✔
538
                },
6✔
539
                MatchedPermission: &extensions_ssh.PortForwardPermission{
6✔
540
                        RequestedHost: info.Permission.HostMatcher.InputPattern(),
6✔
541
                        RequestedPort: info.Permission.RequestedPort,
6✔
542
                },
6✔
543
        }
6✔
544
}
6✔
545

546
func buildLbEndpoint(streamID uint64, metadata *extensions_ssh.EndpointMetadata) *envoy_config_endpoint_v3.LbEndpoint {
10✔
547
        endpointMdAny, _ := anypb.New(metadata)
10✔
548
        return &envoy_config_endpoint_v3.LbEndpoint{
10✔
549
                HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{
10✔
550
                        Endpoint: &envoy_config_endpoint_v3.Endpoint{
10✔
551
                                Address: &corev3.Address{
10✔
552
                                        Address: &corev3.Address_SocketAddress{
10✔
553
                                                SocketAddress: &corev3.SocketAddress{
10✔
554
                                                        Address: fmt.Sprintf("ssh:%d", streamID),
10✔
555
                                                        PortSpecifier: &corev3.SocketAddress_PortValue{
10✔
556
                                                                PortValue: metadata.ServerPort.Value,
10✔
557
                                                        },
10✔
558
                                                },
10✔
559
                                        },
10✔
560
                                },
10✔
561
                        },
10✔
562
                },
10✔
563
                Metadata: &corev3.Metadata{
10✔
564
                        TypedFilterMetadata: map[string]*anypb.Any{
10✔
565
                                "com.pomerium.ssh.endpoint": endpointMdAny,
10✔
566
                        },
10✔
567
                },
10✔
568
                HealthStatus: corev3.HealthStatus_HEALTHY,
10✔
569
        }
10✔
570
}
10✔
571

572
func (sm *StreamManager) reauthLoop(ctx context.Context) {
39✔
573
        for {
122✔
574
                select {
83✔
575
                case <-ctx.Done():
39✔
576
                        return
39✔
577
                case <-sm.reauthC:
44✔
578
                        sm.mu.Lock()
44✔
579
                        snapshot := make([]*activeStream, 0, len(sm.activeStreams))
44✔
580
                        for _, s := range sm.activeStreams {
54✔
581
                                snapshot = append(snapshot, s)
10✔
582
                        }
10✔
583
                        sm.mu.Unlock()
44✔
584

44✔
585
                        for _, s := range snapshot {
54✔
586
                                s.Handler.Reauth()
10✔
587
                        }
10✔
588
                }
589
        }
590
}
591

592
func (sm *StreamManager) endpointsUpdateLoop(ctx context.Context) {
39✔
593
        for {
134✔
594
                select {
95✔
595
                case <-ctx.Done():
39✔
596
                        return
39✔
597
                case update := <-sm.endpointsUpdateQueue:
56✔
598
                        sm.processStreamEndpointsUpdate(update)
56✔
599
                }
600
        }
601
}
602

603
func (sm *StreamManager) terminateStreamLocked(streamID uint64) {
13✔
604
        if sh, ok := sm.activeStreams[streamID]; ok {
26✔
605
                sh.Handler.Terminate(status.Errorf(codes.PermissionDenied, "no longer authorized"))
13✔
606
        }
13✔
607
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc