• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 21890602901

11 Feb 2026 02:36AM UTC coverage: 43.966% (-0.01%) from 43.977%
21890602901

push

github

web-flow
Add custom git merge driver for components.json (#6068)

## Summary

Add a custom git merge driver that automatically resolves merge
conflicts in `internal/version/components.json` by picking the highest
semver version for each component.

This eliminates the recurring merge conflicts that occur when multiple
branches update component versions.

## Related issues

N/A

## User Explanation

No user-facing changes. This is a developer experience improvement that
reduces merge conflicts.

## Checklist

- [x] reference any related issues
- [ ] updated unit tests
- [x] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [ ] ready for review

---

### Setup (required once after cloning)

```bash
make setup
```

This registers the merge driver in your local git config.

34 of 102 new or added lines in 2 files covered. (33.33%)

15 existing lines in 2 files now uncovered.

31854 of 72452 relevant lines covered (43.97%)

118.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.91
/pkg/ssh/manager.go
1
package ssh
2

3
import (
4
        "cmp"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "slices"
9
        "strconv"
10
        "sync"
11
        "time"
12

13
        corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
14
        envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
15
        datav3 "github.com/envoyproxy/go-control-plane/envoy/data/core/v3"
16
        discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
17
        endpointv3 "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
18
        "github.com/envoyproxy/go-control-plane/pkg/cache/types"
19
        "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
20
        "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
21
        "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3"
22
        "github.com/rs/zerolog"
23
        "golang.org/x/sync/errgroup"
24
        "google.golang.org/grpc/codes"
25
        "google.golang.org/grpc/status"
26
        "google.golang.org/protobuf/types/known/anypb"
27
        "google.golang.org/protobuf/types/known/emptypb"
28

29
        extensions_ssh "github.com/pomerium/envoy-custom/api/extensions/filters/network/ssh"
30
        "github.com/pomerium/pomerium/config"
31
        "github.com/pomerium/pomerium/internal/log"
32
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
33
        "github.com/pomerium/pomerium/pkg/grpc/session"
34
        "github.com/pomerium/pomerium/pkg/ssh/cli"
35
        "github.com/pomerium/pomerium/pkg/ssh/portforward"
36
)
37

38
type streamClusterEndpointDiscovery struct {
39
        self     *StreamManager
40
        streamID uint64
41
}
42

43
type streamEndpointsUpdate struct {
44
        streamID uint64
45
        added    map[string]portforward.RoutePortForwardInfo
46
        removed  map[string]struct{}
47
}
48

49
// UpdateClusterEndpoints implements EndpointDiscoveryInterface.
50
func (ed *streamClusterEndpointDiscovery) UpdateClusterEndpoints(added map[string]portforward.RoutePortForwardInfo, removed map[string]struct{}) {
55✔
51
        // run this callback in a separate goroutine, since it can deadlock if called
55✔
52
        // synchronously during startup
55✔
53
        ed.self.endpointsUpdateQueue <- streamEndpointsUpdate{
55✔
54
                streamID: ed.streamID,
55✔
55
                added:    added,
55✔
56
                removed:  removed,
55✔
57
        }
55✔
58
}
55✔
59

60
// PortForwardManager implements EndpointDiscoveryInterface.
61
func (ed *streamClusterEndpointDiscovery) PortForwardManager() *portforward.Manager {
6✔
62
        return ed.self.getPortForwardManagerForStream(ed.streamID)
6✔
63
}
6✔
64

65
var _ EndpointDiscoveryInterface = (*streamClusterEndpointDiscovery)(nil)
66

67
var ErrReauthDone = errors.New("reauth loop done")
68

69
type activeStream struct {
70
        Handler            *StreamHandler
71
        PortForwardManager *portforward.Manager
72
        Session            *string
73
        SessionBindingID   *string
74
        Endpoints          map[string]struct{}
75
}
76

77
type StreamManager struct {
78
        endpointv3.UnimplementedEndpointDiscoveryServiceServer
79
        ready                            chan struct{}
80
        logger                           *zerolog.Logger
81
        auth                             AuthInterface
82
        reauthC                          chan struct{}
83
        initialSessionSyncDone           bool
84
        initialSessionBindingSyncDone    bool
85
        waitForInitialSessionSync        chan struct{}
86
        waitForInitialSessionBindingSync chan struct{}
87

88
        mu sync.Mutex
89

90
        cfg           *config.Config
91
        activeStreams map[uint64]*activeStream
92

93
        // Tracks stream IDs for active sessions
94
        sessionStreams map[string]map[uint64]struct{}
95

96
        // Tracks stream IDs per sessionBindingID for active sessions
97
        bindingStreams map[string]map[uint64]struct{}
98

99
        bindingSyncer *bindingSyncer
100
        // Tracks endpoint stream IDs for clusters
101
        clusterEndpoints     map[string]map[uint64]*extensions_ssh.EndpointMetadata
102
        endpointsUpdateQueue chan streamEndpointsUpdate
103
        edsCache             *cache.LinearCache
104
        edsServer            delta.Server
105
        indexer              PolicyIndexer
106
        cliCtrl              cli.InternalCLIController
107
}
108

109
// LogHealthCheckEvent implements grpc.HealthCheckEventSinkServer.
110
func (sm *StreamManager) LogHealthCheckEvent(ctx context.Context, event *datav3.HealthCheckEvent) (*emptypb.Empty, error) {
×
111
        sm.mu.Lock()
×
112
        defer sm.mu.Unlock()
×
113
        // metadata looks like this:
×
114
        /*
×
115
                {
×
116
                        "healthCheckerType": "TCP",
×
117
                        "host": {
×
118
                                "envoyInternalAddress": {
×
119
                                        "serverListenerName": "ssh:10447317534193689787"
×
120
                                }
×
121
                        },
×
122
                        "clusterName": "route-d5461d93f5ff6efd",
×
123
                        "ejectUnhealthyEvent": {
×
124
                                "failureType": "NETWORK_TIMEOUT"
×
125
                        },
×
126
                        "timestamp": "2025-12-03T22:22:52.742Z",
×
127
                        "metadata": {
×
128
                                "typedFilterMetadata": {
×
129
                                        "com.pomerium.ssh.endpoint": {
×
130
                                                "@type": "type.googleapis.com/pomerium.extensions.ssh.EndpointMetadata",
×
131
                                                "serverPort": {
×
132
                                                        "value": 56301,
×
133
                                                        "isDynamic": true
×
134
                                                },
×
135
                                                "matchedPermission": {
×
136
                                                        "requestedHost": "localhost"
×
137
                                                }
×
138
                                        }
×
139
                                }
×
140
                        },
×
141
                        "locality": {}
×
142
                }
×
143
        */
×
144

×
145
        for streamID := range sm.clusterEndpoints[event.ClusterName] {
×
146
                sm.activeStreams[streamID].Handler.OnClusterHealthUpdate(ctx, event)
×
147
        }
×
148

149
        return &emptypb.Empty{}, nil
×
150
}
151

152
func (sm *StreamManager) getPortForwardManagerForStream(streamID uint64) *portforward.Manager {
6✔
153
        sm.mu.Lock()
6✔
154
        defer sm.mu.Unlock()
6✔
155
        activeStream := sm.activeStreams[streamID]
6✔
156
        if activeStream == nil || activeStream.PortForwardManager == nil {
6✔
157
                return nil
×
158
        }
×
159
        return activeStream.PortForwardManager // may be nil
6✔
160
}
161

162
// OnDeltaStreamClosed implements delta.Callbacks.
163
func (sm *StreamManager) OnDeltaStreamClosed(int64, *corev3.Node) {
×
164
}
×
165

166
// OnDeltaStreamOpen implements delta.Callbacks.
167
func (sm *StreamManager) OnDeltaStreamOpen(context.Context, int64, string) error {
×
168
        return nil
×
169
}
×
170

171
// OnStreamDeltaRequest implements delta.Callbacks.
172
func (sm *StreamManager) OnStreamDeltaRequest(_ int64, req *discoveryv3.DeltaDiscoveryRequest) error {
×
173
        if len(req.ResourceNamesSubscribe) == 0 {
×
174
                return nil
×
175
        }
×
176
        sm.mu.Lock()
×
177
        defer sm.mu.Unlock()
×
178
        initialEmptyResources := make(map[string]types.Resource)
×
179
        for _, clusterID := range req.ResourceNamesSubscribe {
×
180
                if _, ok := sm.clusterEndpoints[clusterID]; !ok {
×
181
                        sm.clusterEndpoints[clusterID] = map[uint64]*extensions_ssh.EndpointMetadata{}
×
182
                        initialEmptyResources[clusterID] = &envoy_config_endpoint_v3.ClusterLoadAssignment{
×
183
                                ClusterName: clusterID,
×
184
                        }
×
185
                }
×
186
        }
187
        return sm.edsCache.UpdateResources(initialEmptyResources, nil)
×
188
}
189

190
// OnStreamDeltaResponse implements delta.Callbacks.
191
func (sm *StreamManager) OnStreamDeltaResponse(int64, *discoveryv3.DeltaDiscoveryRequest, *discoveryv3.DeltaDiscoveryResponse) {
×
192
}
×
193

194
// DeltaEndpoints implements endpointv3.EndpointDiscoveryServiceServer.
195
func (sm *StreamManager) DeltaEndpoints(stream endpointv3.EndpointDiscoveryService_DeltaEndpointsServer) error {
×
196
        select {
×
197
        case <-stream.Context().Done():
×
198
                return context.Cause(stream.Context())
×
199
        case <-sm.ready:
×
200
        }
201
        log.Ctx(stream.Context()).Debug().Msg("delta endpoint stream started")
×
202
        defer log.Ctx(stream.Context()).Debug().Msg("delta endpoint stream ended")
×
203
        return sm.edsServer.DeltaStreamHandler(stream, resource.EndpointType)
×
204
}
205

206
// ClearRecords implements databroker.SyncerHandler.
207
func (sm *StreamManager) ClearRecords(ctx context.Context) {
145✔
208
        sm.mu.Lock()
145✔
209
        defer sm.mu.Unlock()
145✔
210
        if !sm.initialSessionSyncDone {
289✔
211
                sm.initialSessionSyncDone = true
144✔
212
                close(sm.waitForInitialSessionSync)
144✔
213
                log.Ctx(ctx).Debug().
144✔
214
                        Msg("ssh stream manager: initial sync done")
144✔
215
                return
144✔
216
        }
144✔
217
        for sessionID, streamIDs := range sm.sessionStreams {
3✔
218
                for streamID := range streamIDs {
4✔
219
                        log.Ctx(ctx).Debug().
2✔
220
                                Str("session-id", sessionID).
2✔
221
                                Uint64("stream-id", streamID).
2✔
222
                                Msg("terminating stream: databroker sync reset")
2✔
223
                        sm.terminateStreamLocked(streamID)
2✔
224
                }
2✔
225
        }
226
        clear(sm.sessionStreams)
1✔
227
}
228

229
func (sm *StreamManager) clearRecordsBinding(ctx context.Context) {
40✔
230
        sm.mu.Lock()
40✔
231
        defer sm.mu.Unlock()
40✔
232
        if !sm.initialSessionBindingSyncDone {
80✔
233
                sm.initialSessionBindingSyncDone = true
40✔
234
                close(sm.waitForInitialSessionBindingSync)
40✔
235
                log.Ctx(ctx).Debug().
40✔
236
                        Msg("ssh stream manager: initial sync done")
40✔
237
                return
40✔
238
        }
40✔
UNCOV
239
        for sessionID, streamIDs := range sm.bindingStreams {
×
UNCOV
240
                for streamID := range streamIDs {
×
UNCOV
241
                        log.Ctx(ctx).Debug().
×
UNCOV
242
                                Str("session-binding-id", sessionID).
×
UNCOV
243
                                Uint64("stream-id", streamID).
×
UNCOV
244
                                Msg("terminating stream: databroker sync reset")
×
UNCOV
245
                        sm.terminateStreamLocked(streamID)
×
UNCOV
246
                }
×
247
        }
UNCOV
248
        clear(sm.sessionStreams)
×
249
}
250

251
// GetDataBrokerServiceClient implements databroker.SyncerHandler.
252
func (sm *StreamManager) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
211✔
253
        return sm.auth.GetDataBrokerServiceClient()
211✔
254
}
211✔
255

256
// UpdateRecords implements databroker.SyncerHandler.
257
func (sm *StreamManager) UpdateRecords(ctx context.Context, _ uint64, records []*databroker.Record) {
86✔
258
        sm.mu.Lock()
86✔
259
        defer sm.mu.Unlock()
86✔
260
        for _, record := range records {
131✔
261
                if record.DeletedAt == nil {
87✔
262
                        // New session
42✔
263
                        var s session.Session
42✔
264
                        if err := record.Data.UnmarshalTo(&s); err != nil {
42✔
265
                                log.Ctx(ctx).Err(err).Msg("invalid session object, ignoring")
×
266
                                continue
×
267
                        }
268
                        s.Version = strconv.FormatUint(record.GetVersion(), 10)
42✔
269
                        sm.indexer.OnSessionCreated(&s)
42✔
270
                        continue
42✔
271
                }
272
                // Session was deleted; terminate all of its associated streams
273
                sm.indexer.OnSessionDeleted(record.Id)
3✔
274
                for streamID := range sm.sessionStreams[record.Id] {
6✔
275
                        log.Ctx(ctx).Debug().
3✔
276
                                Str("session-id", record.Id).
3✔
277
                                Uint64("stream-id", streamID).
3✔
278
                                Msg("terminating stream: session revoked")
3✔
279
                        sm.terminateStreamLocked(streamID)
3✔
280
                }
3✔
281
                delete(sm.sessionStreams, record.Id)
3✔
282
        }
283
}
284

285
func (sm *StreamManager) updateRecordsBinding(ctx context.Context, _ uint64, records []*databroker.Record) {
86✔
286
        sm.mu.Lock()
86✔
287
        defer sm.mu.Unlock()
86✔
288
        for _, record := range records {
132✔
289
                if record.DeletedAt == nil {
86✔
290
                        continue
40✔
291
                }
292
                // Session binding was deleted; terminate all of its associated streams
293
                for streamID := range sm.bindingStreams[record.Id] {
12✔
294
                        log.Ctx(ctx).Debug().
6✔
295
                                Str("session-id", record.Id).
6✔
296
                                Uint64("stream-id", streamID).
6✔
297
                                Msg("terminating stream: session binding revoked")
6✔
298
                        sm.terminateStreamLocked(streamID)
6✔
299
                }
6✔
300
                delete(sm.bindingStreams, record.Id)
6✔
301
        }
302
}
303

304
func (sm *StreamManager) OnStreamAuthenticated(ctx context.Context, streamID uint64, req AuthRequest) error {
45✔
305
        if err := sm.waitForInitialSync(ctx); err != nil {
45✔
306
                return err
×
307
        }
×
308
        sm.mu.Lock()
45✔
309
        defer sm.mu.Unlock()
45✔
310
        activeStream := sm.activeStreams[streamID]
45✔
311
        if activeStream.Session != nil || activeStream.SessionBindingID != nil {
45✔
312
                return status.Errorf(codes.Internal, "stream %d already has an associated session", streamID)
×
313
        }
×
314

315
        if sm.sessionStreams[req.SessionID] == nil {
89✔
316
                sm.sessionStreams[req.SessionID] = map[uint64]struct{}{}
44✔
317
        }
44✔
318
        if sm.bindingStreams[req.SessionBindingID] == nil {
87✔
319
                sm.bindingStreams[req.SessionBindingID] = map[uint64]struct{}{}
42✔
320
        }
42✔
321
        sm.sessionStreams[req.SessionID][streamID] = struct{}{}
45✔
322
        sm.bindingStreams[req.SessionBindingID][streamID] = struct{}{}
45✔
323

45✔
324
        activeStream.Session = new(string)
45✔
325
        *activeStream.Session = req.SessionID
45✔
326
        activeStream.SessionBindingID = new(string)
45✔
327
        *activeStream.SessionBindingID = req.SessionBindingID
45✔
328

45✔
329
        activeStream.PortForwardManager.AddUpdateListener(activeStream.Handler)
45✔
330

45✔
331
        sm.indexer.OnStreamAuthenticated(streamID, req)
45✔
332
        return nil
45✔
333
}
334

335
type bindingSyncer struct {
336
        clientHandler func() databroker.DataBrokerServiceClient
337
        clearHandler  func(context.Context)
338
        updateHandler func(context.Context, uint64, []*databroker.Record)
339
}
340

341
var _ databroker.SyncerHandler = (*bindingSyncer)(nil)
342

343
func (sbr *bindingSyncer) ClearRecords(ctx context.Context) {
40✔
344
        sbr.clearHandler(ctx)
40✔
345
}
40✔
346

347
// GetDataBrokerServiceClient implements databroker.SyncerHandler.
348
func (sbr *bindingSyncer) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
103✔
349
        return sbr.clientHandler()
103✔
350
}
103✔
351

352
// UpdateRecords implements databroker.SyncerHandler.
353
func (sbr *bindingSyncer) UpdateRecords(ctx context.Context, serverVersion uint64, records []*databroker.Record) {
86✔
354
        sbr.updateHandler(ctx, serverVersion, records)
86✔
355
}
86✔
356

357
func NewStreamManager(ctx context.Context, auth AuthInterface, indexer PolicyIndexer, cliCtrl cli.InternalCLIController, cfg *config.Config) *StreamManager {
144✔
358
        sm := &StreamManager{
144✔
359
                logger:                           log.Ctx(ctx),
144✔
360
                auth:                             auth,
144✔
361
                ready:                            make(chan struct{}),
144✔
362
                waitForInitialSessionSync:        make(chan struct{}),
144✔
363
                waitForInitialSessionBindingSync: make(chan struct{}),
144✔
364
                reauthC:                          make(chan struct{}, 1),
144✔
365
                cfg:                              cfg,
144✔
366
                activeStreams:                    map[uint64]*activeStream{},
144✔
367
                sessionStreams:                   map[string]map[uint64]struct{}{},
144✔
368
                clusterEndpoints:                 map[string]map[uint64]*extensions_ssh.EndpointMetadata{},
144✔
369
                edsCache:                         cache.NewLinearCache(resource.EndpointType),
144✔
370
                endpointsUpdateQueue:             make(chan streamEndpointsUpdate, 128),
144✔
371
                bindingStreams:                   map[string]map[uint64]struct{}{},
144✔
372
                indexer:                          indexer,
144✔
373
                cliCtrl:                          cliCtrl,
144✔
374
        }
144✔
375

144✔
376
        bindingSyncer := &bindingSyncer{
144✔
377
                clientHandler: sm.GetDataBrokerServiceClient,
144✔
378
                clearHandler:  sm.clearRecordsBinding,
144✔
379
                updateHandler: sm.updateRecordsBinding,
144✔
380
        }
144✔
381

144✔
382
        sm.bindingSyncer = bindingSyncer
144✔
383

144✔
384
        sm.indexer.ProcessConfigUpdate(cfg)
144✔
385
        return sm
144✔
386
}
144✔
387

388
func (sm *StreamManager) waitForInitialSync(ctx context.Context) error {
48✔
389
        sm.mu.Lock()
48✔
390
        for !sm.initialSessionSyncDone || !sm.initialSessionBindingSyncDone {
494✔
391
                sm.mu.Unlock()
446✔
392
                select {
446✔
393
                case <-sm.waitForInitialSessionSync:
445✔
394
                case <-sm.waitForInitialSessionBindingSync:
1✔
395
                case <-time.After(10 * time.Second):
×
396
                        return errors.New("timed out waiting for initial sync")
×
397
                case <-ctx.Done():
×
398
                        return context.Cause(ctx)
×
399
                }
400
                sm.mu.Lock()
446✔
401
        }
402
        sm.mu.Unlock()
48✔
403
        return nil
48✔
404
}
405

406
func (sm *StreamManager) Run(ctx context.Context) error {
40✔
407
        sm.edsServer = delta.NewServer(ctx, sm.edsCache, sm)
40✔
408
        eg, eCtx := errgroup.WithContext(ctx)
40✔
409
        eg.Go(func() error {
80✔
410
                syncer := databroker.NewSyncer(
40✔
411
                        eCtx,
40✔
412
                        "ssh-auth-session-sync",
40✔
413
                        sm,
40✔
414
                        databroker.WithTypeURL("type.googleapis.com/session.Session"))
40✔
415
                return syncer.Run(eCtx)
40✔
416
        })
40✔
417

418
        eg.Go(func() error {
80✔
419
                syncer := databroker.NewSyncer(
40✔
420
                        eCtx,
40✔
421
                        "ssh-auth-session-binding-sync",
40✔
422
                        sm.bindingSyncer,
40✔
423
                        databroker.WithTypeURL("type.googleapis.com/session.SessionBinding"),
40✔
424
                )
40✔
425
                return syncer.Run(eCtx)
40✔
426
        })
40✔
427

428
        eg.Go(func() error {
80✔
429
                sm.reauthLoop(eCtx)
40✔
430
                return ErrReauthDone
40✔
431
        })
40✔
432
        eg.Go(func() error {
80✔
433
                sm.endpointsUpdateLoop(ctx)
40✔
434
                return nil
40✔
435
        })
40✔
436

437
        close(sm.ready)
40✔
438
        err := eg.Wait()
40✔
439
        if errors.Is(err, ErrReauthDone) {
68✔
440
                return nil
28✔
441
        }
28✔
442

443
        return err
12✔
444
}
445

446
func (sm *StreamManager) OnConfigChange(cfg *config.Config) {
45✔
447
        sm.indexer.ProcessConfigUpdate(cfg)
45✔
448

45✔
449
        // TODO: integrate the re-auth mechanism with the indexer
45✔
450
        select {
45✔
451
        case sm.reauthC <- struct{}{}:
45✔
452
        default:
×
453
        }
454
}
455

456
func (sm *StreamManager) LookupStream(streamID uint64) *StreamHandler {
21✔
457
        sm.mu.Lock()
21✔
458
        defer sm.mu.Unlock()
21✔
459
        if info, ok := sm.activeStreams[streamID]; ok {
40✔
460
                return info.Handler
19✔
461
        }
19✔
462
        return nil
2✔
463
}
464

465
func (sm *StreamManager) NewStreamHandler(
466
        _ context.Context,
467
        downstream *extensions_ssh.DownstreamConnectEvent,
468
) *StreamHandler {
150✔
469
        sm.mu.Lock()
150✔
470
        defer sm.mu.Unlock()
150✔
471
        streamID := downstream.StreamId
150✔
472

150✔
473
        onClose := func() {
289✔
474
                sm.onStreamHandlerClosed(streamID)
139✔
475
        }
139✔
476
        discovery := &streamClusterEndpointDiscovery{
150✔
477
                self:     sm,
150✔
478
                streamID: streamID,
150✔
479
        }
150✔
480
        sh := NewStreamHandler(sm.auth, discovery, sm.cliCtrl, sm.cfg, downstream, onClose)
150✔
481
        portForwardMgr := portforward.NewManager()
150✔
482
        sm.activeStreams[streamID] = &activeStream{
150✔
483
                Handler:            sh,
150✔
484
                Endpoints:          map[string]struct{}{},
150✔
485
                PortForwardManager: portForwardMgr,
150✔
486
        }
150✔
487
        sm.indexer.AddStream(streamID, portForwardMgr)
150✔
488
        return sh
150✔
489
}
490

491
func (sm *StreamManager) onStreamHandlerClosed(streamID uint64) {
139✔
492
        sm.mu.Lock()
139✔
493
        defer sm.mu.Unlock()
139✔
494
        info := sm.activeStreams[streamID]
139✔
495
        delete(sm.activeStreams, streamID)
139✔
496

139✔
497
        info.PortForwardManager.RemoveUpdateListener(info.Handler)
139✔
498
        sm.indexer.RemoveStream(streamID)
139✔
499

139✔
500
        if info.Session != nil {
179✔
501
                session := *info.Session
40✔
502
                delete(sm.sessionStreams[session], streamID)
40✔
503
                if len(sm.sessionStreams[session]) == 0 {
80✔
504
                        delete(sm.sessionStreams, session)
40✔
505
                }
40✔
506
        }
507
        if info.SessionBindingID != nil {
179✔
508
                bindingID := *info.SessionBindingID
40✔
509
                delete(sm.bindingStreams[bindingID], streamID)
40✔
510
                if len(sm.bindingStreams[bindingID]) == 0 {
80✔
511
                        delete(sm.bindingStreams, bindingID)
40✔
512
                }
40✔
513
        }
514

515
        if len(info.Endpoints) > 0 {
141✔
516
                sm.logger.Debug().
2✔
517
                        Uint64("stream-id", streamID).
2✔
518
                        Any("endpoints", info.Endpoints).
2✔
519
                        Msg("clearing endpoints for closed stream")
2✔
520
                sm.endpointsUpdateQueue <- streamEndpointsUpdate{
2✔
521
                        streamID: streamID,
2✔
522
                        removed:  info.Endpoints,
2✔
523
                }
2✔
524
        }
2✔
525
}
526

527
func (sm *StreamManager) processStreamEndpointsUpdate(update streamEndpointsUpdate) {
57✔
528
        sm.mu.Lock()
57✔
529
        defer sm.mu.Unlock()
57✔
530
        streamID := update.streamID
57✔
531

57✔
532
        activeStream := sm.activeStreams[streamID] // can be nil
57✔
533

57✔
534
        toUpdate := map[string]types.Resource{} // *envoy_config_endpoint_v3.LbEndpoint
57✔
535
        var toDelete []string
57✔
536
        for clusterID, info := range update.added {
63✔
537
                if activeStream != nil {
12✔
538
                        activeStream.Endpoints[clusterID] = struct{}{}
6✔
539
                }
6✔
540
                if _, ok := sm.clusterEndpoints[clusterID]; !ok {
10✔
541
                        sm.clusterEndpoints[clusterID] = map[uint64]*extensions_ssh.EndpointMetadata{}
4✔
542
                }
4✔
543
                sm.clusterEndpoints[clusterID][streamID] = buildEndpointMetadata(info)
6✔
544
                toUpdate[clusterID] = buildClusterLoadAssignment(clusterID, sm.clusterEndpoints[clusterID])
6✔
545
        }
546
        for clusterID := range update.removed {
63✔
547
                if activeStream != nil {
10✔
548
                        delete(activeStream.Endpoints, clusterID)
4✔
549
                }
4✔
550
                delete(sm.clusterEndpoints[clusterID], streamID)
6✔
551
                if len(sm.clusterEndpoints[clusterID]) == 0 {
10✔
552
                        // No more endpoints for this cluster, so delete the resource. The cluster
4✔
553
                        // will handle this by clearing the endpoints.
4✔
554
                        toDelete = append(toDelete, clusterID)
4✔
555
                        delete(sm.clusterEndpoints, clusterID)
4✔
556
                } else {
6✔
557
                        // There are still endpoints
2✔
558
                        toUpdate[clusterID] = buildClusterLoadAssignment(clusterID, sm.clusterEndpoints[clusterID])
2✔
559
                }
2✔
560
        }
561

562
        if err := sm.edsCache.UpdateResources(toUpdate, toDelete); err != nil {
57✔
563
                sm.logger.Err(err).Msg("error updating EDS resources")
×
564
        }
×
565
}
566

567
func buildClusterLoadAssignment(clusterID string, clusterEndpoints map[uint64]*extensions_ssh.EndpointMetadata) types.Resource {
8✔
568
        endpoints := []*envoy_config_endpoint_v3.LbEndpoint{}
8✔
569
        for streamID, metadata := range clusterEndpoints {
18✔
570
                endpoints = append(endpoints, buildLbEndpoint(streamID, metadata))
10✔
571
        }
10✔
572
        slices.SortFunc(endpoints, compareEndpoints)
8✔
573
        return &envoy_config_endpoint_v3.ClusterLoadAssignment{
8✔
574
                ClusterName: clusterID,
8✔
575
                Endpoints:   []*envoy_config_endpoint_v3.LocalityLbEndpoints{{LbEndpoints: endpoints}},
8✔
576
        }
8✔
577
}
578

579
func compareEndpoints(a, b *envoy_config_endpoint_v3.LbEndpoint) int {
2✔
580
        return cmp.Compare(
2✔
581
                a.GetEndpoint().GetAddress().GetSocketAddress().GetAddress(),
2✔
582
                b.GetEndpoint().GetAddress().GetSocketAddress().GetAddress())
2✔
583
}
2✔
584

585
func buildEndpointMetadata(info portforward.RoutePortForwardInfo) *extensions_ssh.EndpointMetadata {
6✔
586
        serverPort := info.Permission.ServerPort()
6✔
587
        return &extensions_ssh.EndpointMetadata{
6✔
588
                ServerPort: &extensions_ssh.ServerPort{
6✔
589
                        Value:     serverPort.Value,
6✔
590
                        IsDynamic: serverPort.IsDynamic,
6✔
591
                },
6✔
592
                MatchedPermission: &extensions_ssh.PortForwardPermission{
6✔
593
                        RequestedHost: info.Permission.HostMatcher.InputPattern(),
6✔
594
                        RequestedPort: info.Permission.RequestedPort,
6✔
595
                },
6✔
596
        }
6✔
597
}
6✔
598

599
func buildLbEndpoint(streamID uint64, metadata *extensions_ssh.EndpointMetadata) *envoy_config_endpoint_v3.LbEndpoint {
10✔
600
        endpointMdAny, _ := anypb.New(metadata)
10✔
601
        return &envoy_config_endpoint_v3.LbEndpoint{
10✔
602
                HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{
10✔
603
                        Endpoint: &envoy_config_endpoint_v3.Endpoint{
10✔
604
                                Address: &corev3.Address{
10✔
605
                                        Address: &corev3.Address_SocketAddress{
10✔
606
                                                SocketAddress: &corev3.SocketAddress{
10✔
607
                                                        Address: fmt.Sprintf("ssh:%d", streamID),
10✔
608
                                                        PortSpecifier: &corev3.SocketAddress_PortValue{
10✔
609
                                                                PortValue: metadata.ServerPort.Value,
10✔
610
                                                        },
10✔
611
                                                },
10✔
612
                                        },
10✔
613
                                },
10✔
614
                                HealthCheckConfig: &envoy_config_endpoint_v3.Endpoint_HealthCheckConfig{},
10✔
615
                        },
10✔
616
                },
10✔
617
                Metadata: &corev3.Metadata{
10✔
618
                        TypedFilterMetadata: map[string]*anypb.Any{
10✔
619
                                "com.pomerium.ssh.endpoint": endpointMdAny,
10✔
620
                        },
10✔
621
                },
10✔
622
                HealthStatus: corev3.HealthStatus_UNKNOWN,
10✔
623
        }
10✔
624
}
10✔
625

626
func (sm *StreamManager) reauthLoop(ctx context.Context) {
40✔
627
        for {
125✔
628
                select {
85✔
629
                case <-ctx.Done():
40✔
630
                        return
40✔
631
                case <-sm.reauthC:
45✔
632
                        sm.mu.Lock()
45✔
633
                        snapshot := make([]*activeStream, 0, len(sm.activeStreams))
45✔
634
                        for _, s := range sm.activeStreams {
55✔
635
                                snapshot = append(snapshot, s)
10✔
636
                        }
10✔
637
                        sm.mu.Unlock()
45✔
638

45✔
639
                        for _, s := range snapshot {
55✔
640
                                s.Handler.Reauth()
10✔
641
                        }
10✔
642
                }
643
        }
644
}
645

646
func (sm *StreamManager) endpointsUpdateLoop(ctx context.Context) {
40✔
647
        for {
137✔
648
                select {
97✔
649
                case <-ctx.Done():
40✔
650
                        return
40✔
651
                case update := <-sm.endpointsUpdateQueue:
57✔
652
                        sm.processStreamEndpointsUpdate(update)
57✔
653
                }
654
        }
655
}
656

657
func (sm *StreamManager) terminateStreamLocked(streamID uint64) {
11✔
658
        if sh, ok := sm.activeStreams[streamID]; ok {
22✔
659
                sh.Handler.Terminate(status.Errorf(codes.PermissionDenied, "no longer authorized"))
11✔
660
        }
11✔
661
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc