• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 19149648094

06 Nov 2025 08:57PM UTC coverage: 56.177% (+0.1%) from 56.051%
19149648094

push

github

web-flow
ssh: initial implementation of reverse tunnel EDS (#5915)

This implements the reverse tunnel Endpoint Discovery Service endpoint. 

There are still more tests to be written but those will be added in a
follow-up.

246 of 308 new or added lines in 7 files covered. (79.87%)

12 existing lines in 5 files now uncovered.

28467 of 50674 relevant lines covered (56.18%)

96.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.66
/pkg/ssh/manager.go
1
package ssh
2

3
import (
4
        "cmp"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "slices"
9
        "sync"
10
        "time"
11

12
        corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
13
        envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
14
        discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
15
        endpointv3 "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
16
        "github.com/envoyproxy/go-control-plane/pkg/cache/types"
17
        "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
18
        "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
19
        "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3"
20
        "github.com/rs/zerolog"
21
        "golang.org/x/sync/errgroup"
22
        "google.golang.org/grpc/codes"
23
        "google.golang.org/grpc/status"
24
        "google.golang.org/protobuf/types/known/anypb"
25

26
        extensions_ssh "github.com/pomerium/envoy-custom/api/extensions/filters/network/ssh"
27
        "github.com/pomerium/pomerium/config"
28
        "github.com/pomerium/pomerium/internal/log"
29
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
30
        "github.com/pomerium/pomerium/pkg/ssh/portforward"
31
)
32

33
type streamClusterEndpointDiscovery struct {
34
        self     *StreamManager
35
        streamID uint64
36
}
37

38
type streamEndpointsUpdate struct {
39
        streamID uint64
40
        added    map[string]portforward.RoutePortForwardInfo
41
        removed  map[string]struct{}
42
}
43

44
func (ed *streamClusterEndpointDiscovery) UpdateClusterEndpoints(added map[string]portforward.RoutePortForwardInfo, removed map[string]struct{}) {
284✔
45
        // run this callback in a separate goroutine, since it can deadlock if called
284✔
46
        // synchronously during startup
284✔
47
        ed.self.endpointsUpdateQueue <- streamEndpointsUpdate{
284✔
48
                streamID: ed.streamID,
284✔
49
                added:    added,
284✔
50
                removed:  removed,
284✔
51
        }
284✔
52
}
284✔
53

54
type activeStream struct {
55
        Handler   *StreamHandler
56
        Session   *string
57
        Endpoints map[string]struct{}
58
}
59

60
type StreamManager struct {
61
        endpointv3.UnimplementedEndpointDiscoveryServiceServer
62
        logger             *zerolog.Logger
63
        auth               AuthInterface
64
        reauthC            chan struct{}
65
        initialSyncDone    bool
66
        waitForInitialSync chan struct{}
67

68
        mu sync.Mutex
69

70
        cfg           *config.Config
71
        activeStreams map[uint64]*activeStream
72

73
        // Tracks stream IDs for active sessions
74
        sessionStreams map[string]map[uint64]struct{}
75
        // Tracks endpoint stream IDs for clusters
76
        clusterEndpoints     map[string]map[uint64]*extensions_ssh.EndpointMetadata
77
        endpointsUpdateQueue chan streamEndpointsUpdate
78
        edsCache             *cache.LinearCache
79
        edsServer            delta.Server
80
}
81

82
// OnDeltaStreamClosed implements delta.Callbacks.
NEW
83
func (sm *StreamManager) OnDeltaStreamClosed(int64, *corev3.Node) {
×
NEW
84
}
×
85

86
// OnDeltaStreamOpen implements delta.Callbacks.
NEW
87
func (sm *StreamManager) OnDeltaStreamOpen(context.Context, int64, string) error {
×
NEW
88
        return nil
×
NEW
89
}
×
90

91
// OnStreamDeltaRequest implements delta.Callbacks.
NEW
92
func (sm *StreamManager) OnStreamDeltaRequest(_ int64, req *discoveryv3.DeltaDiscoveryRequest) error {
×
NEW
93
        if len(req.ResourceNamesSubscribe) == 0 {
×
NEW
94
                return nil
×
NEW
95
        }
×
NEW
96
        initialEmptyResources := make(map[string]types.Resource)
×
NEW
97
        for _, clusterID := range req.ResourceNamesSubscribe {
×
NEW
98
                initialEmptyResources[clusterID] = &envoy_config_endpoint_v3.ClusterLoadAssignment{
×
NEW
99
                        ClusterName: clusterID,
×
NEW
100
                }
×
NEW
101
        }
×
NEW
102
        return sm.edsCache.UpdateResources(initialEmptyResources, nil)
×
103
}
104

105
// OnStreamDeltaResponse implements delta.Callbacks.
NEW
106
func (sm *StreamManager) OnStreamDeltaResponse(int64, *discoveryv3.DeltaDiscoveryRequest, *discoveryv3.DeltaDiscoveryResponse) {
×
NEW
107
}
×
108

109
// DeltaEndpoints implements endpointv3.EndpointDiscoveryServiceServer.
NEW
110
func (sm *StreamManager) DeltaEndpoints(stream endpointv3.EndpointDiscoveryService_DeltaEndpointsServer) error {
×
NEW
111
        log.Ctx(stream.Context()).Debug().Msg("delta endpoint stream started")
×
NEW
112
        defer log.Ctx(stream.Context()).Debug().Msg("delta endpoint stream ended")
×
NEW
113
        return sm.edsServer.DeltaStreamHandler(stream, resource.EndpointType)
×
UNCOV
114
}
×
115

116
// ClearRecords implements databroker.SyncerHandler.
117
func (sm *StreamManager) ClearRecords(ctx context.Context) {
164✔
118
        sm.mu.Lock()
164✔
119
        defer sm.mu.Unlock()
164✔
120
        if !sm.initialSyncDone {
327✔
121
                sm.initialSyncDone = true
163✔
122
                close(sm.waitForInitialSync)
163✔
123
                log.Ctx(ctx).Debug().
163✔
124
                        Msg("ssh stream manager: initial sync done")
163✔
125
                return
163✔
126
        }
163✔
127
        for sessionID, streamIDs := range sm.sessionStreams {
3✔
128
                for streamID := range streamIDs {
4✔
129
                        log.Ctx(ctx).Debug().
2✔
130
                                Str("session-id", sessionID).
2✔
131
                                Uint64("stream-id", streamID).
2✔
132
                                Msg("terminating stream: databroker sync reset")
2✔
133
                        sm.terminateStreamLocked(streamID)
2✔
134
                }
2✔
135
        }
136
        clear(sm.sessionStreams)
1✔
137
}
138

139
// GetDataBrokerServiceClient implements databroker.SyncerHandler.
140
func (sm *StreamManager) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
102✔
141
        return sm.auth.GetDataBrokerServiceClient()
102✔
142
}
102✔
143

144
// UpdateRecords implements databroker.SyncerHandler.
145
func (sm *StreamManager) UpdateRecords(ctx context.Context, _ uint64, records []*databroker.Record) {
73✔
146
        sm.mu.Lock()
73✔
147
        defer sm.mu.Unlock()
73✔
148
        for _, record := range records {
112✔
149
                if record.DeletedAt == nil {
65✔
150
                        continue
26✔
151
                }
152
                // a session was deleted; terminate all of its associated streams
153
                for streamID := range sm.sessionStreams[record.Id] {
22✔
154
                        log.Ctx(ctx).Debug().
9✔
155
                                Str("session-id", record.Id).
9✔
156
                                Uint64("stream-id", streamID).
9✔
157
                                Msg("terminating stream: session revoked")
9✔
158
                        sm.terminateStreamLocked(streamID)
9✔
159
                }
9✔
160
                delete(sm.sessionStreams, record.Id)
13✔
161
        }
162
}
163

164
func (sm *StreamManager) SetSessionIDForStream(ctx context.Context, streamID uint64, sessionID string) error {
47✔
165
        sm.mu.Lock()
47✔
166
        for !sm.initialSyncDone {
59✔
167
                sm.mu.Unlock()
12✔
168
                select {
12✔
169
                case <-sm.waitForInitialSync:
12✔
170
                case <-time.After(10 * time.Second):
×
171
                        return errors.New("timed out waiting for initial sync")
×
NEW
172
                case <-ctx.Done():
×
NEW
173
                        return context.Cause(ctx)
×
174
                }
175
                sm.mu.Lock()
12✔
176
        }
177
        defer sm.mu.Unlock()
47✔
178
        if sm.sessionStreams[sessionID] == nil {
87✔
179
                sm.sessionStreams[sessionID] = map[uint64]struct{}{}
40✔
180
        }
40✔
181
        sm.sessionStreams[sessionID][streamID] = struct{}{}
47✔
182
        sm.activeStreams[streamID].Session = &sessionID
47✔
183
        return nil
47✔
184
}
185

186
func NewStreamManager(ctx context.Context, auth AuthInterface, cfg *config.Config) *StreamManager {
163✔
187
        sm := &StreamManager{
163✔
188
                logger:               log.Ctx(ctx),
163✔
189
                auth:                 auth,
163✔
190
                waitForInitialSync:   make(chan struct{}),
163✔
191
                reauthC:              make(chan struct{}, 1),
163✔
192
                cfg:                  cfg,
163✔
193
                activeStreams:        map[uint64]*activeStream{},
163✔
194
                sessionStreams:       map[string]map[uint64]struct{}{},
163✔
195
                clusterEndpoints:     map[string]map[uint64]*extensions_ssh.EndpointMetadata{},
163✔
196
                edsCache:             cache.NewLinearCache(resource.EndpointType),
163✔
197
                endpointsUpdateQueue: make(chan streamEndpointsUpdate, 128),
163✔
198
        }
163✔
199
        return sm
163✔
200
}
163✔
201

202
func (sm *StreamManager) Run(ctx context.Context) error {
56✔
203
        sm.edsServer = delta.NewServer(ctx, sm.edsCache, sm)
56✔
204

56✔
205
        syncer := databroker.NewSyncer(ctx, "ssh-auth-session-sync", sm,
56✔
206
                databroker.WithTypeURL("type.googleapis.com/session.Session"))
56✔
207
        eg, ctx := errgroup.WithContext(ctx)
56✔
208
        eg.Go(func() error {
112✔
209
                sm.reauthLoop(ctx)
56✔
210
                return nil
56✔
211
        })
56✔
212
        eg.Go(func() error {
112✔
213
                sm.endpointsUpdateLoop(ctx)
56✔
214
                return nil
56✔
215
        })
56✔
216
        eg.Go(func() error {
112✔
217
                return syncer.Run(ctx)
56✔
218
        })
56✔
219
        return eg.Wait()
56✔
220
}
221

222
func (sm *StreamManager) OnConfigChange(cfg *config.Config) {
62✔
223
        sm.mu.Lock()
62✔
224
        sm.cfg = cfg
62✔
225
        for _, s := range sm.activeStreams {
108✔
226
                s.Handler.portForwards.OnConfigUpdate(cfg)
46✔
227
        }
46✔
228
        sm.mu.Unlock()
62✔
229

62✔
230
        select {
62✔
231
        case sm.reauthC <- struct{}{}:
62✔
232
        default:
×
233
        }
234
}
235

236
func (sm *StreamManager) LookupStream(streamID uint64) *StreamHandler {
21✔
237
        sm.mu.Lock()
21✔
238
        defer sm.mu.Unlock()
21✔
239
        if info, ok := sm.activeStreams[streamID]; ok {
40✔
240
                return info.Handler
19✔
241
        }
19✔
242
        return nil
2✔
243
}
244

245
func (sm *StreamManager) NewStreamHandler(
246
        _ context.Context,
247
        downstream *extensions_ssh.DownstreamConnectEvent,
248
) *StreamHandler {
178✔
249
        sm.mu.Lock()
178✔
250
        defer sm.mu.Unlock()
178✔
251
        streamID := downstream.StreamId
178✔
252

178✔
253
        onClose := func() {
343✔
254
                sm.onStreamHandlerClosed(streamID)
165✔
255
        }
165✔
256
        discovery := &streamClusterEndpointDiscovery{
178✔
257
                self:     sm,
178✔
258
                streamID: streamID,
178✔
259
        }
178✔
260
        sh := NewStreamHandler(sm.auth, discovery, sm.cfg, downstream, onClose)
178✔
261
        sm.activeStreams[streamID] = &activeStream{
178✔
262
                Handler:   sh,
178✔
263
                Endpoints: map[string]struct{}{},
178✔
264
        }
178✔
265
        return sh
178✔
266
}
267

268
func (sm *StreamManager) onStreamHandlerClosed(streamID uint64) {
165✔
269
        sm.mu.Lock()
165✔
270
        defer sm.mu.Unlock()
165✔
271
        info := sm.activeStreams[streamID]
165✔
272
        delete(sm.activeStreams, streamID)
165✔
273
        if info.Session != nil {
201✔
274
                session := *info.Session
36✔
275
                delete(sm.sessionStreams[session], streamID)
36✔
276
                if len(sm.sessionStreams[session]) == 0 {
72✔
277
                        delete(sm.sessionStreams, session)
36✔
278
                }
36✔
279
        }
280

281
        if len(info.Endpoints) > 0 {
185✔
282
                sm.logger.Debug().
20✔
283
                        Uint64("stream-id", streamID).
20✔
284
                        Any("endpoints", info.Endpoints).
20✔
285
                        Msg("clearing endpoints for closed stream")
20✔
286
                sm.endpointsUpdateQueue <- streamEndpointsUpdate{
20✔
287
                        streamID: streamID,
20✔
288
                        removed:  info.Endpoints,
20✔
289
                }
20✔
290
        }
20✔
291
}
292

293
func (sm *StreamManager) processStreamEndpointsUpdate(update streamEndpointsUpdate) {
192✔
294
        // TODO: this may not scale well
192✔
295
        sm.mu.Lock()
192✔
296
        defer sm.mu.Unlock()
192✔
297
        streamID := update.streamID
192✔
298

192✔
299
        activeStream := sm.activeStreams[streamID] // can be nil
192✔
300

192✔
301
        toUpdate := map[string]types.Resource{} // *envoy_config_endpoint_v3.LbEndpoint
192✔
302
        toDelete := []string{}
192✔
303
        for clusterID, info := range update.added {
252✔
304
                if activeStream != nil {
120✔
305
                        activeStream.Endpoints[clusterID] = struct{}{}
60✔
306
                }
60✔
307
                if _, ok := sm.clusterEndpoints[clusterID]; !ok {
100✔
308
                        sm.clusterEndpoints[clusterID] = map[uint64]*extensions_ssh.EndpointMetadata{}
40✔
309
                }
40✔
310
                metadata := buildEndpointMetadata(info)
60✔
311
                sm.clusterEndpoints[clusterID][streamID] = metadata
60✔
312
                toUpdate[clusterID] = buildClusterLoadAssignment(clusterID, sm.clusterEndpoints[clusterID])
60✔
313
        }
314

315
        for clusterID := range update.removed {
252✔
316
                if activeStream != nil {
100✔
317
                        delete(activeStream.Endpoints, clusterID)
40✔
318
                }
40✔
319
                delete(sm.clusterEndpoints[clusterID], streamID)
60✔
320
                if len(sm.clusterEndpoints[clusterID]) == 0 {
100✔
321
                        delete(sm.clusterEndpoints, clusterID)
40✔
322
                        toDelete = append(toDelete, clusterID)
40✔
323
                } else {
60✔
324
                        toUpdate[clusterID] = buildClusterLoadAssignment(clusterID, sm.clusterEndpoints[clusterID])
20✔
325
                }
20✔
326
        }
327

328
        if err := sm.edsCache.UpdateResources(toUpdate, toDelete); err != nil {
192✔
NEW
329
                sm.logger.Err(err).Msg("error updating EDS resources")
×
NEW
330
        }
×
331
}
332

333
func buildClusterLoadAssignment(clusterID string, clusterEndpoints map[uint64]*extensions_ssh.EndpointMetadata) types.Resource {
80✔
334
        endpoints := []*envoy_config_endpoint_v3.LbEndpoint{}
80✔
335
        for streamID, metadata := range clusterEndpoints {
180✔
336
                endpoints = append(endpoints, buildLbEndpoint(streamID, metadata))
100✔
337
        }
100✔
338
        slices.SortFunc(endpoints, compareEndpoints)
80✔
339
        return &envoy_config_endpoint_v3.ClusterLoadAssignment{
80✔
340
                ClusterName: clusterID,
80✔
341
                Endpoints:   []*envoy_config_endpoint_v3.LocalityLbEndpoints{{LbEndpoints: endpoints}},
80✔
342
        }
80✔
343
}
344

345
func compareEndpoints(a, b *envoy_config_endpoint_v3.LbEndpoint) int {
20✔
346
        return cmp.Compare(
20✔
347
                a.GetEndpoint().GetAddress().GetSocketAddress().GetAddress(),
20✔
348
                b.GetEndpoint().GetAddress().GetSocketAddress().GetAddress())
20✔
349
}
20✔
350

351
func buildEndpointMetadata(info portforward.RoutePortForwardInfo) *extensions_ssh.EndpointMetadata {
60✔
352
        serverPort := info.Permission.ServerPort()
60✔
353
        return &extensions_ssh.EndpointMetadata{
60✔
354
                ServerPort: &extensions_ssh.ServerPort{
60✔
355
                        Value:     serverPort.Value,
60✔
356
                        IsDynamic: serverPort.IsDynamic,
60✔
357
                },
60✔
358
                MatchedPermission: &extensions_ssh.PortForwardPermission{
60✔
359
                        RequestedHost: info.Permission.HostMatcher.InputPattern(),
60✔
360
                        RequestedPort: info.Permission.RequestedPort,
60✔
361
                },
60✔
362
        }
60✔
363
}
60✔
364

365
func buildLbEndpoint(streamID uint64, metadata *extensions_ssh.EndpointMetadata) *envoy_config_endpoint_v3.LbEndpoint {
100✔
366
        endpointMdAny, _ := anypb.New(metadata)
100✔
367
        return &envoy_config_endpoint_v3.LbEndpoint{
100✔
368
                HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{
100✔
369
                        Endpoint: &envoy_config_endpoint_v3.Endpoint{
100✔
370
                                Address: &corev3.Address{
100✔
371
                                        Address: &corev3.Address_SocketAddress{
100✔
372
                                                SocketAddress: &corev3.SocketAddress{
100✔
373
                                                        Address: fmt.Sprintf("ssh:%d", streamID),
100✔
374
                                                        PortSpecifier: &corev3.SocketAddress_PortValue{
100✔
375
                                                                PortValue: metadata.ServerPort.Value,
100✔
376
                                                        },
100✔
377
                                                },
100✔
378
                                        },
100✔
379
                                },
100✔
380
                        },
100✔
381
                },
100✔
382
                Metadata: &corev3.Metadata{
100✔
383
                        TypedFilterMetadata: map[string]*anypb.Any{
100✔
384
                                "com.pomerium.ssh.endpoint": endpointMdAny,
100✔
385
                        },
100✔
386
                },
100✔
387
                HealthStatus: corev3.HealthStatus_HEALTHY,
100✔
388
        }
100✔
389
}
100✔
390

391
func (sm *StreamManager) reauthLoop(ctx context.Context) {
56✔
392
        for {
174✔
393
                select {
118✔
394
                case <-ctx.Done():
56✔
395
                        return
56✔
396
                case <-sm.reauthC:
62✔
397
                        sm.mu.Lock()
62✔
398
                        snapshot := make([]*activeStream, 0, len(sm.activeStreams))
62✔
399
                        for _, s := range sm.activeStreams {
108✔
400
                                snapshot = append(snapshot, s)
46✔
401
                        }
46✔
402
                        sm.mu.Unlock()
62✔
403

62✔
404
                        for _, s := range snapshot {
108✔
405
                                s.Handler.Reauth()
46✔
406
                        }
46✔
407
                }
408
        }
409
}
410

411
func (sm *StreamManager) endpointsUpdateLoop(ctx context.Context) {
56✔
412
        for {
304✔
413
                select {
248✔
414
                case <-ctx.Done():
56✔
415
                        return
56✔
416
                case update := <-sm.endpointsUpdateQueue:
192✔
417
                        sm.processStreamEndpointsUpdate(update)
192✔
418
                }
419
        }
420
}
421

422
func (sm *StreamManager) terminateStreamLocked(streamID uint64) {
11✔
423
        if sh, ok := sm.activeStreams[streamID]; ok {
22✔
424
                sh.Handler.Terminate(status.Errorf(codes.PermissionDenied, "no longer authorized"))
11✔
425
        }
11✔
426
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc