• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Semior001 / groxy / 17013031799

16 Aug 2025 09:21PM UTC coverage: 46.883% (+1.0%) from 45.847%
17013031799

Pull #13

github

Semior001
Merge branch 'master' into feature/templating
Pull Request #13: add templating

97 of 124 new or added lines in 7 files covered. (78.23%)

106 existing lines in 6 files now uncovered.

1053 of 2246 relevant lines covered (46.88%)

18.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.84
/pkg/proxy/proxy.go
1
// Package proxy provides server and codec for proxying gRPC requests.
2
package proxy
3

4
import (
5
        "errors"
6
        "fmt"
7
        "io"
8
        "log/slog"
9
        "net"
10

11
        "context"
12

13
        "github.com/Semior001/groxy/pkg/discovery"
14
        "github.com/Semior001/groxy/pkg/grpcx"
15
        "github.com/Semior001/groxy/pkg/proxy/middleware"
16
        "github.com/cappuccinotm/slogx"
17
        "google.golang.org/grpc"
18
        "google.golang.org/grpc/codes"
19
        "google.golang.org/grpc/health"
20
        healthpb "google.golang.org/grpc/health/grpc_health_v1"
21
        "google.golang.org/grpc/metadata"
22
        "google.golang.org/grpc/status"
23
)
24

25
//go:generate moq -out mocks/mocks.go --skip-ensure -pkg mocks . Matcher ServerStream
26

27
// ServerStream is a gRPC server stream.
28
type ServerStream grpc.ServerStream
29

30
// Matcher matches the request URI and incoming metadata to the
31
// registered rules.
32
type Matcher interface {
33
        MatchMetadata(string, metadata.MD) discovery.Matches // returns matches based on the method and metadata.
34
        Upstreams() []discovery.Upstream                     // returns all upstreams registered in the matcher
35
}
36

37
// Server is a gRPC server.
38
type Server struct {
39
        version string
40

41
        serverOpts []grpc.ServerOption
42
        matcher    Matcher
43

44
        signature  bool
45
        reflection bool
46
        debug      bool
47
        l          net.Listener
48
        grpc       *grpc.Server
49
}
50

51
// NewServer creates a new server.
52
func NewServer(m Matcher, opts ...Option) *Server {
1✔
53
        s := &Server{
1✔
54
                matcher:   m,
1✔
55
                signature: false,
1✔
56
        }
1✔
57

1✔
58
        for _, opt := range opts {
3✔
59
                opt(s)
2✔
60
        }
2✔
61

62
        return s
1✔
63
}
64

65
// Listen starts the server on the given address.
66
// Blocking call.
67
func (s *Server) Listen(addr string) (err error) {
1✔
68
        slog.Info("starting gRPC server", slog.Any("addr", addr))
1✔
69
        defer slog.Warn("gRPC server stopped", slogx.Error(err))
1✔
70

1✔
71
        healthHandler := health.NewServer()
1✔
72
        healthHandler.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
1✔
73

1✔
74
        noMatchHandler := func(any, grpc.ServerStream) error {
1✔
75
                return status.Error(codes.Internal, "{groxy} didn't match request to any rule")
×
76
        }
×
77

78
        s.grpc = grpc.NewServer(append(s.serverOpts,
1✔
79
                grpc.ForceServerCodec(grpcx.RawBytesCodec{}),
1✔
80
                grpc.UnknownServiceHandler(middleware.Wrap(noMatchHandler,
1✔
81
                        middleware.Recoverer("{groxy} panic"),
1✔
82
                        middleware.Maybe(s.signature, middleware.AppInfo("groxy", "Semior001", s.version)),
1✔
83
                        middleware.Log(s.debug, "/grpc.reflection."),
1✔
84
                        middleware.PassMetadata(),
1✔
85
                        middleware.Health(healthHandler),
1✔
86
                        middleware.Maybe(s.reflection, middleware.Chain(
1✔
87
                                middleware.Reflector{
1✔
88
                                        Logger:        slog.Default().With(slog.String("subsystem", "reflection")),
1✔
89
                                        UpstreamsFunc: s.matcher.Upstreams,
1✔
90
                                }.Middleware,
1✔
91
                        )),
1✔
92
                        s.matchMiddleware,
1✔
93
                        s.mockMiddleware, s.forwardMiddleware,
1✔
94
                )),
1✔
95
        )...)
1✔
96

1✔
97
        if s.l, err = net.Listen("tcp", addr); err != nil {
1✔
98
                return fmt.Errorf("register listener: %w", err)
×
99
        }
×
100

101
        if err = s.grpc.Serve(s.l); err != nil {
1✔
102
                return fmt.Errorf("serve: %w", err)
×
103
        }
×
104

105
        return nil
1✔
106
}
107

108
// Close stops the server.
109
func (s *Server) Close() { s.grpc.GracefulStop() }
1✔
110

111
type contextKey string
112

113
var (
114
        ctxMatch     = contextKey("match")
115
        ctxFirstRecv = contextKey("first_recv")
116
)
117

118
func (s *Server) matchMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
119
        return func(srv any, stream grpc.ServerStream) error {
8✔
120
                ctx := stream.Context()
7✔
121

7✔
122
                mtd, ok := grpc.Method(ctx)
7✔
123
                if !ok {
7✔
124
                        slog.WarnContext(ctx, "failed to get method from context")
×
125
                        return status.Error(codes.Internal, "{groxy} failed to get method from the context")
×
126
                }
×
127

128
                md, ok := metadata.FromIncomingContext(ctx)
7✔
129
                if !ok {
7✔
130
                        md = metadata.New(nil)
×
131
                }
×
132

133
                matches := s.matcher.MatchMetadata(mtd, md)
7✔
134
                if len(matches) == 0 {
7✔
135
                        return next(srv, stream)
×
136
                }
×
137

138
                slog.DebugContext(ctx, "found matches", slog.Any("matches", matches))
7✔
139

7✔
140
                match := matches[0]
7✔
141
                if !matches.NeedsDeeperMatch() {
7✔
142
                        slog.DebugContext(ctx, "matched", slog.Any("match", match))
×
143
                        ctx = context.WithValue(ctx, ctxMatch, match)
×
144
                        return next(srv, grpcx.StreamWithContext(ctx, stream))
×
145
                }
×
146

147
                var firstRecv []byte
7✔
148
                if err := stream.RecvMsg(&firstRecv); err != nil {
7✔
149
                        slog.WarnContext(ctx, "failed to read the first RECV", slogx.Error(err))
×
150
                        return status.Errorf(codes.Internal, "{groxy} failed to read the first RECV: %v", err)
×
151
                }
×
152

153
                ctx = context.WithValue(ctx, ctxFirstRecv, firstRecv)
7✔
154
                if match, ok = matches.MatchMessage(ctx, firstRecv); !ok {
7✔
155
                        return next(srv, stream)
×
156
                }
×
157

158
                slog.DebugContext(ctx, "matched", slog.Any("match", match))
7✔
159
                ctx = context.WithValue(ctx, ctxMatch, match)
7✔
160
                return next(srv, grpcx.StreamWithContext(ctx, stream))
7✔
161
        }
162
}
163

164
func (s *Server) mockMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
165
        return func(srv any, stream grpc.ServerStream) error {
8✔
166
                ctx := stream.Context()
7✔
167

7✔
168
                match, ok := ctx.Value(ctxMatch).(*discovery.Rule)
7✔
169
                if !ok {
7✔
170
                        return next(srv, stream)
×
171
                }
×
172

173
                if match.Mock == nil {
11✔
174
                        return next(srv, stream)
4✔
175
                }
4✔
176

177
                if len(match.Mock.Header) > 0 {
4✔
178
                        if err := stream.SetHeader(match.Mock.Header); err != nil {
1✔
179
                                slog.WarnContext(ctx, "failed to set header to the client", slogx.Error(err))
×
180
                        }
×
181
                }
182

183
                if len(match.Mock.Trailer) > 0 {
3✔
184
                        stream.SetTrailer(match.Mock.Trailer)
×
185
                }
×
186

187
                switch {
3✔
188
                case match.Mock.Body != nil:
1✔
189
                        msg, err := match.Mock.Body.Generate(ctx, nil)
1✔
190
                        if err != nil {
1✔
NEW
191
                                slog.WarnContext(ctx, "failed to generate mock body", slogx.Error(err))
×
NEW
192
                                return status.Errorf(codes.Internal, "{groxy} failed to generate mock body: %v", err)
×
NEW
193
                        }
×
194

195
                        if err := stream.SendMsg(msg); err != nil {
1✔
196
                                return status.Errorf(codes.Internal, "{groxy} failed to send message: %v", err)
×
197
                        }
×
198
                case match.Mock.Status != nil:
2✔
199
                        return match.Mock.Status.Err()
2✔
200
                default:
×
201
                        return status.Error(codes.Internal, "{groxy} empty mock")
×
202
                }
203

204
                // dump the rest of the stream
205
                for {
2✔
206
                        if err := stream.RecvMsg(nil); err != nil {
2✔
207
                                if errors.Is(err, io.EOF) {
2✔
208
                                        return nil
1✔
209
                                }
1✔
210

UNCOV
211
                                return status.Errorf(codes.Internal, "{groxy} failed to read the rest of the stream: %v", err)
×
212
                        }
213
                }
214
        }
215
}
216

217
func (s *Server) forwardMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
218
        return func(_ any, stream grpc.ServerStream) error {
5✔
219
                ctx := stream.Context()
4✔
220

4✔
221
                match, ok := ctx.Value(ctxMatch).(*discovery.Rule)
4✔
222
                if !ok || match.Forward == nil {
4✔
223
                        return next(nil, stream)
×
UNCOV
224
                }
×
225

226
                ctx = plantHeader(ctx, match.Forward.Header)
4✔
227

4✔
228
                mtd, _ := grpc.Method(ctx)
4✔
229
                desc := &grpc.StreamDesc{ClientStreams: true, ServerStreams: true}
4✔
230

4✔
231
                upstreamHeader, upstreamTrailer := metadata.New(nil), metadata.New(nil)
4✔
232

4✔
233
                upstream, err := match.Forward.Upstream.NewStream(ctx, desc, mtd,
4✔
234
                        grpc.ForceCodec(grpcx.RawBytesCodec{}),
4✔
235
                        grpc.Header(&upstreamHeader),
4✔
236
                        grpc.Trailer(&upstreamTrailer))
4✔
237
                if err != nil {
4✔
UNCOV
238
                        return status.Errorf(codes.Internal, "{groxy} failed to create upstream: %v", err)
×
UNCOV
239
                }
×
240

241
                if firstRecv, _ := ctx.Value(ctxFirstRecv).([]byte); firstRecv != nil {
8✔
242
                        if err = upstream.SendMsg(firstRecv); err != nil {
4✔
UNCOV
243
                                return status.Errorf(codes.Internal,
×
UNCOV
244
                                        "{groxy} failed to send the first message to the upstream: %v", err)
×
UNCOV
245
                        }
×
246
                }
247

248
                defer func() {
8✔
249
                        stream.SetTrailer(metadata.Join(upstreamHeader, upstreamTrailer))
4✔
250

4✔
251
                        if err = upstream.CloseSend(); err != nil {
4✔
UNCOV
252
                                slog.WarnContext(ctx, "failed to close the upstream",
×
UNCOV
253
                                        slog.String("upstream_name", match.Forward.Upstream.Name()),
×
UNCOV
254
                                        slogx.Error(err))
×
255
                        }
×
256
                }()
257

258
                if err = grpcx.Pipe(upstream, stream); err != nil {
8✔
259
                        if errors.Is(err, io.EOF) {
7✔
260
                                return eofStatus(upstream)
3✔
261
                        }
3✔
262
                        if st := grpcx.StatusFromError(err); st != nil {
2✔
263
                                return st.Err()
1✔
264
                        }
1✔
265
                        slog.WarnContext(ctx, "failed to pipe",
×
266
                                slog.String("upstream_name", match.Forward.Upstream.Name()),
×
267
                                slogx.Error(err))
×
UNCOV
268
                        return status.Errorf(codes.Internal, "{groxy} failed to pipe messages to the upstream")
×
269
                }
270

UNCOV
271
                return nil
×
272
        }
273
}
274

275
func eofStatus(upstream grpc.ClientStream) (err error) {
3✔
276
        if err = upstream.RecvMsg(nil); err == nil {
3✔
277
                return status.Error(codes.Internal, "{groxy} unexpected EOF from the upstream")
×
278
        }
×
279
        if st := grpcx.StatusFromError(err); st != nil {
3✔
280
                return st.Err()
×
UNCOV
281
        }
×
282
        if !errors.Is(err, io.EOF) {
3✔
283
                return status.Errorf(codes.Internal, "{groxy} failed to read the EOF from the upstream: %v", err)
×
UNCOV
284
        }
×
285
        return nil // if there is just EOF then probably everything is fine
3✔
286
}
287

288
func plantHeader(ctx context.Context, header metadata.MD) context.Context {
4✔
289
        outMD, ok := metadata.FromOutgoingContext(ctx)
4✔
290
        if !ok {
4✔
UNCOV
291
                outMD = metadata.New(nil)
×
292
        }
×
293

294
        for k, v := range header {
4✔
295
                if _, ok = outMD[k]; !ok {
×
296
                        outMD[k] = v
×
UNCOV
297
                }
×
298
        }
299

300
        return metadata.NewOutgoingContext(ctx, outMD)
4✔
301
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc