• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Semior001 / groxy / 17176148032

23 Aug 2025 01:41PM UTC coverage: 45.373% (-0.5%) from 45.847%
17176148032

Pull #13

github

Semior001
feat: template from a request message
Pull Request #13: add templating

105 of 238 new or added lines in 9 files covered. (44.12%)

2 existing lines in 2 files now uncovered.

1059 of 2334 relevant lines covered (45.37%)

17.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.5
/pkg/proxy/proxy.go
1
// Package proxy provides server and codec for proxying gRPC requests.
2
package proxy
3

4
import (
5
        "errors"
6
        "fmt"
7
        "io"
8
        "log/slog"
9
        "net"
10

11
        "context"
12

13
        "github.com/Semior001/groxy/pkg/discovery"
14
        "github.com/Semior001/groxy/pkg/grpcx"
15
        "github.com/Semior001/groxy/pkg/proxy/middleware"
16
        "github.com/cappuccinotm/slogx"
17
        "google.golang.org/grpc"
18
        "google.golang.org/grpc/codes"
19
        "google.golang.org/grpc/health"
20
        healthpb "google.golang.org/grpc/health/grpc_health_v1"
21
        "google.golang.org/grpc/metadata"
22
        "google.golang.org/grpc/status"
23
)
24

25
//go:generate moq -out mocks/mocks.go --skip-ensure -pkg mocks . Matcher ServerStream
26

27
// ServerStream is a gRPC server stream.
28
type ServerStream grpc.ServerStream
29

30
// Matcher matches the request URI and incoming metadata to the
31
// registered rules.
32
type Matcher interface {
33
        MatchMetadata(string, metadata.MD) discovery.Matches // returns matches based on the method and metadata.
34
        Upstreams() []discovery.Upstream                     // returns all upstreams registered in the matcher
35
}
36

37
// Server is a gRPC server.
38
type Server struct {
39
        version string
40

41
        serverOpts []grpc.ServerOption
42
        matcher    Matcher
43

44
        signature  bool
45
        reflection bool
46
        debug      bool
47
        l          net.Listener
48
        grpc       *grpc.Server
49
}
50

51
// NewServer creates a new server.
52
func NewServer(m Matcher, opts ...Option) *Server {
1✔
53
        s := &Server{
1✔
54
                matcher:   m,
1✔
55
                signature: false,
1✔
56
        }
1✔
57

1✔
58
        for _, opt := range opts {
3✔
59
                opt(s)
2✔
60
        }
2✔
61

62
        return s
1✔
63
}
64

65
// Listen starts the server on the given address.
66
// Blocking call.
67
func (s *Server) Listen(addr string) (err error) {
1✔
68
        slog.Info("starting gRPC server", slog.Any("addr", addr))
1✔
69
        defer slog.Warn("gRPC server stopped", slogx.Error(err))
1✔
70

1✔
71
        healthHandler := health.NewServer()
1✔
72
        healthHandler.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
1✔
73

1✔
74
        noMatchHandler := func(any, grpc.ServerStream) error {
1✔
75
                return status.Error(codes.Internal, "{groxy} didn't match request to any rule")
×
76
        }
×
77

78
        s.grpc = grpc.NewServer(append(s.serverOpts,
1✔
79
                grpc.ForceServerCodec(grpcx.RawBytesCodec{}),
1✔
80
                grpc.UnknownServiceHandler(middleware.Wrap(noMatchHandler,
1✔
81
                        middleware.Recoverer("{groxy} panic"),
1✔
82
                        middleware.Maybe(s.signature, middleware.AppInfo("groxy", "Semior001", s.version)),
1✔
83
                        middleware.Log(s.debug, "/grpc.reflection."),
1✔
84
                        middleware.PassMetadata(),
1✔
85
                        middleware.Health(healthHandler),
1✔
86
                        middleware.Maybe(s.reflection, middleware.Chain(
1✔
87
                                middleware.Reflector{
1✔
88
                                        Logger:        slog.Default().With(slog.String("subsystem", "reflection")),
1✔
89
                                        UpstreamsFunc: s.matcher.Upstreams,
1✔
90
                                }.Middleware,
1✔
91
                        )),
1✔
92
                        s.matchMiddleware,
1✔
93
                        s.mockMiddleware, s.forwardMiddleware,
1✔
94
                )),
1✔
95
        )...)
1✔
96

1✔
97
        if s.l, err = net.Listen("tcp", addr); err != nil {
1✔
98
                return fmt.Errorf("register listener: %w", err)
×
99
        }
×
100

101
        if err = s.grpc.Serve(s.l); err != nil {
1✔
102
                return fmt.Errorf("serve: %w", err)
×
103
        }
×
104

UNCOV
105
        return nil
×
106
}
107

108
// Close stops the server.
109
func (s *Server) Close() { s.grpc.GracefulStop() }
1✔
110

111
type contextKey string
112

113
var (
114
        ctxMatch     = contextKey("match")
115
        ctxFirstRecv = contextKey("first_recv")
116
)
117

118
func (s *Server) matchMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
119
        return func(srv any, stream grpc.ServerStream) error {
8✔
120
                ctx := stream.Context()
7✔
121

7✔
122
                mtd, ok := grpc.Method(ctx)
7✔
123
                if !ok {
7✔
124
                        slog.WarnContext(ctx, "failed to get method from context")
×
125
                        return status.Error(codes.Internal, "{groxy} failed to get method from the context")
×
126
                }
×
127

128
                md, ok := metadata.FromIncomingContext(ctx)
7✔
129
                if !ok {
7✔
130
                        md = metadata.New(nil)
×
131
                }
×
132

133
                matches := s.matcher.MatchMetadata(mtd, md)
7✔
134
                if len(matches) == 0 {
7✔
135
                        return next(srv, stream)
×
136
                }
×
137

138
                slog.DebugContext(ctx, "found matches", slog.Any("matches", matches))
7✔
139

7✔
140
                match := matches[0]
7✔
141
                if !matches.NeedsDeeperMatch() {
7✔
142
                        slog.DebugContext(ctx, "matched", slog.Any("match", match))
×
143
                        ctx = context.WithValue(ctx, ctxMatch, match)
×
144
                        return next(srv, grpcx.StreamWithContext(ctx, stream))
×
145
                }
×
146

147
                var firstRecv []byte
7✔
148
                if err := stream.RecvMsg(&firstRecv); err != nil {
7✔
149
                        slog.WarnContext(ctx, "failed to read the first RECV", slogx.Error(err))
×
150
                        return status.Errorf(codes.Internal, "{groxy} failed to read the first RECV: %v", err)
×
151
                }
×
152

153
                ctx = context.WithValue(ctx, ctxFirstRecv, firstRecv)
7✔
154
                if match, ok = matches.MatchMessage(ctx, firstRecv); !ok {
7✔
155
                        return next(srv, stream)
×
156
                }
×
157

158
                slog.DebugContext(ctx, "matched", slog.Any("match", match))
7✔
159
                ctx = context.WithValue(ctx, ctxMatch, match)
7✔
160
                return next(srv, grpcx.StreamWithContext(ctx, stream))
7✔
161
        }
162
}
163

164
func (s *Server) mockMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
165
        return func(srv any, stream grpc.ServerStream) error {
8✔
166
                ctx := stream.Context()
7✔
167

7✔
168
                match, ok := ctx.Value(ctxMatch).(*discovery.Rule)
7✔
169
                if !ok {
7✔
170
                        return next(srv, stream)
×
171
                }
×
172

173
                if match.Mock == nil {
11✔
174
                        return next(srv, stream)
4✔
175
                }
4✔
176

177
                if len(match.Mock.Header) > 0 {
4✔
178
                        if err := stream.SetHeader(match.Mock.Header); err != nil {
1✔
179
                                slog.WarnContext(ctx, "failed to set header to the client", slogx.Error(err))
×
180
                        }
×
181
                }
182

183
                if len(match.Mock.Trailer) > 0 {
3✔
184
                        stream.SetTrailer(match.Mock.Trailer)
×
185
                }
×
186

187
                switch {
3✔
188
                case match.Mock.Body != nil:
1✔
189
                        var data map[string]any
1✔
190

1✔
191
                        firstRecv := ctx.Value(ctxFirstRecv)
1✔
192
                        if firstRecv != nil && match.Match.Message != nil {
2✔
193
                                dm, err := match.Match.Message.DataMap(ctx, firstRecv.([]byte))
1✔
194
                                if err != nil {
1✔
NEW
195
                                        slog.WarnContext(ctx, "failed to extract data from the first message", slogx.Error(err))
×
NEW
196
                                        return status.Errorf(codes.Internal, "{groxy} failed to extract data from the first message: %v", err)
×
NEW
197
                                }
×
198
                                data = dm
1✔
199
                        }
200

201
                        msg, err := match.Mock.Body.Generate(ctx, data)
1✔
202
                        if err != nil {
1✔
NEW
203
                                slog.WarnContext(ctx, "failed to generate mock body", slogx.Error(err))
×
NEW
204
                                return status.Errorf(codes.Internal, "{groxy} failed to generate mock body: %v", err)
×
NEW
205
                        }
×
206

207
                        if err := stream.SendMsg(msg); err != nil {
1✔
208
                                return status.Errorf(codes.Internal, "{groxy} failed to send message: %v", err)
×
209
                        }
×
210
                case match.Mock.Status != nil:
2✔
211
                        return match.Mock.Status.Err()
2✔
212
                default:
×
213
                        return status.Error(codes.Internal, "{groxy} empty mock")
×
214
                }
215

216
                // dump the rest of the stream
217
                for {
2✔
218
                        if err := stream.RecvMsg(nil); err != nil {
2✔
219
                                if errors.Is(err, io.EOF) {
2✔
220
                                        return nil
1✔
221
                                }
1✔
222

223
                                return status.Errorf(codes.Internal, "{groxy} failed to read the rest of the stream: %v", err)
×
224
                        }
225
                }
226
        }
227
}
228

229
func (s *Server) forwardMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
230
        return func(_ any, stream grpc.ServerStream) error {
5✔
231
                ctx := stream.Context()
4✔
232

4✔
233
                match, ok := ctx.Value(ctxMatch).(*discovery.Rule)
4✔
234
                if !ok || match.Forward == nil {
4✔
235
                        return next(nil, stream)
×
236
                }
×
237

238
                ctx = plantHeader(ctx, match.Forward.Header)
4✔
239

4✔
240
                mtd, _ := grpc.Method(ctx)
4✔
241
                desc := &grpc.StreamDesc{ClientStreams: true, ServerStreams: true}
4✔
242

4✔
243
                upstreamHeader, upstreamTrailer := metadata.New(nil), metadata.New(nil)
4✔
244

4✔
245
                upstream, err := match.Forward.Upstream.NewStream(ctx, desc, mtd,
4✔
246
                        grpc.ForceCodec(grpcx.RawBytesCodec{}),
4✔
247
                        grpc.Header(&upstreamHeader),
4✔
248
                        grpc.Trailer(&upstreamTrailer))
4✔
249
                if err != nil {
4✔
250
                        return status.Errorf(codes.Internal, "{groxy} failed to create upstream: %v", err)
×
251
                }
×
252

253
                if firstRecv, _ := ctx.Value(ctxFirstRecv).([]byte); firstRecv != nil {
8✔
254
                        if err = upstream.SendMsg(firstRecv); err != nil {
4✔
255
                                return status.Errorf(codes.Internal,
×
256
                                        "{groxy} failed to send the first message to the upstream: %v", err)
×
257
                        }
×
258
                }
259

260
                defer func() {
8✔
261
                        stream.SetTrailer(metadata.Join(upstreamHeader, upstreamTrailer))
4✔
262

4✔
263
                        if err = upstream.CloseSend(); err != nil {
4✔
264
                                slog.WarnContext(ctx, "failed to close the upstream",
×
265
                                        slog.String("upstream_name", match.Forward.Upstream.Name()),
×
266
                                        slogx.Error(err))
×
267
                        }
×
268
                }()
269

270
                if err = grpcx.Pipe(upstream, stream); err != nil {
8✔
271
                        if errors.Is(err, io.EOF) {
7✔
272
                                return eofStatus(upstream)
3✔
273
                        }
3✔
274
                        if st := grpcx.StatusFromError(err); st != nil {
2✔
275
                                return st.Err()
1✔
276
                        }
1✔
277
                        slog.WarnContext(ctx, "failed to pipe",
×
278
                                slog.String("upstream_name", match.Forward.Upstream.Name()),
×
279
                                slogx.Error(err))
×
280
                        return status.Errorf(codes.Internal, "{groxy} failed to pipe messages to the upstream")
×
281
                }
282

283
                return nil
×
284
        }
285
}
286

287
func eofStatus(upstream grpc.ClientStream) (err error) {
3✔
288
        if err = upstream.RecvMsg(nil); err == nil {
3✔
289
                return status.Error(codes.Internal, "{groxy} unexpected EOF from the upstream")
×
290
        }
×
291
        if st := grpcx.StatusFromError(err); st != nil {
3✔
292
                return st.Err()
×
293
        }
×
294
        if !errors.Is(err, io.EOF) {
3✔
295
                return status.Errorf(codes.Internal, "{groxy} failed to read the EOF from the upstream: %v", err)
×
296
        }
×
297
        return nil // if there is just EOF then probably everything is fine
3✔
298
}
299

300
func plantHeader(ctx context.Context, header metadata.MD) context.Context {
4✔
301
        outMD, ok := metadata.FromOutgoingContext(ctx)
4✔
302
        if !ok {
4✔
303
                outMD = metadata.New(nil)
×
304
        }
×
305

306
        for k, v := range header {
4✔
307
                if _, ok = outMD[k]; !ok {
×
308
                        outMD[k] = v
×
309
                }
×
310
        }
311

312
        return metadata.NewOutgoingContext(ctx, outMD)
4✔
313
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc