• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 19149648094

06 Nov 2025 08:57PM UTC coverage: 56.177% (+0.1%) from 56.051%
19149648094

push

github

web-flow
ssh: initial implementation of reverse tunnel EDS (#5915)

This implements the reverse tunnel Endpoint Discovery Service endpoint. 

There are still more tests to be written but those will be added in a
follow-up.

246 of 308 new or added lines in 7 files covered. (79.87%)

12 existing lines in 5 files now uncovered.

28467 of 50674 relevant lines covered (56.18%)

96.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/authorize/ssh_grpc.go
1
package authorize
2

3
import (
4
        "context"
5
        "errors"
6
        "io"
7

8
        "golang.org/x/sync/errgroup"
9
        "google.golang.org/grpc/codes"
10
        "google.golang.org/grpc/status"
11

12
        extensions_ssh "github.com/pomerium/envoy-custom/api/extensions/filters/network/ssh"
13
        "github.com/pomerium/pomerium/authorize/evaluator"
14
        "github.com/pomerium/pomerium/internal/log"
15
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
16
        "github.com/pomerium/pomerium/pkg/grpc/user"
17
        "github.com/pomerium/pomerium/pkg/ssh"
18
        "github.com/pomerium/pomerium/pkg/storage"
19
)
20

21
func (a *Authorize) ManageStream(stream extensions_ssh.StreamManagement_ManageStreamServer) error {
×
22
        event, err := stream.Recv()
×
23
        if err != nil {
×
24
                return err
×
25
        }
×
26
        // first message should be a downstream connected event
27
        downstream := event.GetEvent().GetDownstreamConnected()
×
28
        if downstream == nil {
×
29
                return status.Errorf(codes.Internal, "first message was not a downstream connected event")
×
30
        }
×
31

32
        handler := a.ssh.NewStreamHandler(stream.Context(), downstream)
×
33
        defer handler.Close()
×
34

×
35
        eg, ctx := errgroup.WithContext(stream.Context())
×
36

×
37
        eg.Go(func() error {
×
38
                for {
×
39
                        req, err := stream.Recv()
×
40
                        if err != nil {
×
41
                                if errors.Is(err, io.EOF) {
×
42
                                        return nil
×
43
                                }
×
44
                                return err
×
45
                        }
46
                        handler.ReadC() <- req
×
47
                }
48
        })
49

50
        eg.Go(func() error {
×
51
                for {
×
52
                        select {
×
53
                        case <-ctx.Done():
×
54
                                return nil
×
55
                        case msg, ok := <-handler.WriteC():
×
56
                                if !ok {
×
57
                                        // StreamHandler.close() called, no more messages to send
×
58
                                        return nil
×
59
                                }
×
60
                                if err := stream.Send(msg); err != nil {
×
61
                                        if errors.Is(err, io.EOF) {
×
62
                                                return nil
×
63
                                        }
×
64
                                        return err
×
65
                                }
66
                        }
67
                }
68
        })
69

70
        return handler.Run(ctx)
×
71
}
72

73
func (a *Authorize) ServeChannel(stream extensions_ssh.StreamManagement_ServeChannelServer) error {
×
74
        metadata, err := stream.Recv()
×
75
        if err != nil {
×
76
                return err
×
77
        }
×
78
        // first message contains metadata
79
        var typedMd extensions_ssh.FilterMetadata
×
80
        if md := metadata.GetMetadata(); md != nil {
×
81
                if err := md.GetTypedFilterMetadata()["com.pomerium.ssh"].UnmarshalTo(&typedMd); err != nil {
×
82
                        return err
×
83
                }
×
84
        } else {
×
85
                return status.Errorf(codes.Internal, "first message was not metadata")
×
86
        }
×
87
        handler := a.ssh.LookupStream(typedMd.GetStreamId())
×
88
        if handler == nil || !handler.IsExpectingInternalChannel() {
×
89
                return status.Errorf(codes.InvalidArgument, "stream not found")
×
90
        }
×
91

92
        return handler.ServeChannel(stream, &typedMd)
×
93
}
94

95
func (a *Authorize) EvaluateSSH(ctx context.Context, streamID uint64, req *ssh.Request) (*evaluator.Result, error) {
×
96
        ctx = a.withQuerierForCheckRequest(ctx)
×
97

×
98
        evalreq := evaluator.Request{
×
99
                HTTP: evaluator.RequestHTTP{
×
100
                        Hostname: req.Hostname,
×
101
                        IP:       req.SourceAddress,
×
102
                },
×
103
                SSH: evaluator.RequestSSH{
×
104
                        Username:  req.Username,
×
105
                        PublicKey: req.PublicKey,
×
106
                },
×
107
                Session: evaluator.RequestSession{
×
108
                        ID: req.SessionID,
×
109
                },
×
110
        }
×
111

×
112
        if req.Hostname == "" {
×
113
                evalreq.IsInternal = true
×
114
        } else {
×
115
                evalreq.Policy = a.currentConfig.Load().Options.GetRouteForSSHHostname(req.Hostname)
×
116
        }
×
117

118
        res, err := a.state.Load().evaluator.Evaluate(ctx, &evalreq)
×
119
        if err != nil {
×
120
                log.Ctx(ctx).Error().Err(err).Msg("error during OPA evaluation")
×
121
                return nil, err
×
122
        }
×
123

124
        allowed := res.Allow.Value && !res.Deny.Value
×
125

×
126
        if allowed {
×
127
                // TODO: only do this once, not on re-evaluate
×
NEW
128
                if err := a.ssh.SetSessionIDForStream(ctx, streamID, req.SessionID); err != nil {
×
129
                        log.Ctx(ctx).Error().Err(err).Msg("failed to set session id for stream")
×
130
                        return nil, err
×
131
                }
×
132
        }
133

134
        skipLogging := req.LogOnlyIfDenied && allowed
×
135
        if !skipLogging {
×
136
                s, _ := a.getDataBrokerSessionOrServiceAccount(ctx, req.SessionID, 0)
×
137

×
138
                var u *user.User
×
139
                if s != nil {
×
140
                        u, _ = a.getDataBrokerUser(ctx, s.GetUserId())
×
141
                }
×
142
                a.logAuthorizeCheck(ctx, &evalreq, res, s, u)
×
143
        }
144

145
        return res, nil
×
146
}
147

148
func (a *Authorize) InvalidateCacheForRecords(ctx context.Context, records ...*databroker.Record) {
×
149
        storage.InvalidateCacheForDataBrokerRecords(a.withQuerierForCheckRequest(ctx), records...)
×
150
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc