• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Semior001 / groxy / 20489062660

24 Dec 2025 03:17PM UTC coverage: 47.487% (-0.2%) from 47.691%
20489062660

push

github

web-flow
feat: added rewrite directive to forward option (#21)

2 of 6 new or added lines in 2 files covered. (33.33%)

9 existing lines in 1 file now uncovered.

1134 of 2388 relevant lines covered (47.49%)

33.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.15
/pkg/proxy/proxy.go
1
// Package proxy provides server and codec for proxying gRPC requests.
2
package proxy
3

4
import (
5
        "errors"
6
        "fmt"
7
        "io"
8
        "log/slog"
9
        "net"
10
        "time"
11

12
        "context"
13

14
        "github.com/Semior001/groxy/pkg/discovery"
15
        "github.com/Semior001/groxy/pkg/grpcx"
16
        "github.com/Semior001/groxy/pkg/proxy/middleware"
17
        "github.com/cappuccinotm/slogx"
18
        "google.golang.org/grpc"
19
        "google.golang.org/grpc/codes"
20
        "google.golang.org/grpc/health"
21
        healthpb "google.golang.org/grpc/health/grpc_health_v1"
22
        "google.golang.org/grpc/metadata"
23
        "google.golang.org/grpc/status"
24
)
25

26
//go:generate moq -out mocks/mocks.go --skip-ensure -pkg mocks . Matcher ServerStream
27

28
// ServerStream is a gRPC server stream.
29
type ServerStream grpc.ServerStream
30

31
// Matcher matches the request URI and incoming metadata to the
32
// registered rules.
33
type Matcher interface {
34
        MatchMetadata(string, metadata.MD) discovery.Matches // returns matches based on the method and metadata.
35
        Upstreams() []discovery.Upstream                     // returns all upstreams registered in the matcher
36
}
37

38
// Server is a gRPC server.
39
type Server struct {
40
        version string
41

42
        serverOpts []grpc.ServerOption
43
        matcher    Matcher
44

45
        signature  bool
46
        reflection bool
47
        debug      bool
48
        l          net.Listener
49
        grpc       *grpc.Server
50
}
51

52
// NewServer creates a new server.
53
func NewServer(m Matcher, opts ...Option) *Server {
1✔
54
        s := &Server{
1✔
55
                matcher:   m,
1✔
56
                signature: false,
1✔
57
        }
1✔
58

1✔
59
        for _, opt := range opts {
3✔
60
                opt(s)
2✔
61
        }
2✔
62

63
        return s
1✔
64
}
65

66
// Listen starts the server on the given address.
67
// Blocking call.
68
func (s *Server) Listen(addr string) (err error) {
1✔
69
        slog.Info("starting gRPC server", slog.Any("addr", addr))
1✔
70
        defer slog.Warn("gRPC server stopped", slogx.Error(err))
1✔
71

1✔
72
        healthHandler := health.NewServer()
1✔
73
        healthHandler.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
1✔
74

1✔
75
        noMatchHandler := func(any, grpc.ServerStream) error {
1✔
76
                return status.Error(codes.Internal, "{groxy} didn't match request to any rule")
×
77
        }
×
78

79
        s.grpc = grpc.NewServer(append(s.serverOpts,
1✔
80
                grpc.ForceServerCodec(grpcx.RawBytesCodec{}),
1✔
81
                grpc.UnknownServiceHandler(middleware.Wrap(noMatchHandler,
1✔
82
                        middleware.Recoverer("{groxy} panic"),
1✔
83
                        middleware.Maybe(s.signature, middleware.AppInfo("groxy", "Semior001", s.version)),
1✔
84
                        middleware.Log(s.debug, "/grpc.reflection."),
1✔
85
                        middleware.PassMetadata(),
1✔
86
                        middleware.Health(healthHandler),
1✔
87
                        middleware.Maybe(s.reflection, middleware.Chain(
1✔
88
                                middleware.Reflector{
1✔
89
                                        Logger:        slog.Default().With(slog.String("subsystem", "reflection")),
1✔
90
                                        UpstreamsFunc: s.matcher.Upstreams,
1✔
91
                                }.Middleware,
1✔
92
                        )),
1✔
93
                        s.matchMiddleware,
1✔
94
                        s.mockMiddleware, s.forwardMiddleware,
1✔
95
                )),
1✔
96
        )...)
1✔
97

1✔
98
        if s.l, err = net.Listen("tcp", addr); err != nil {
1✔
99
                return fmt.Errorf("register listener: %w", err)
×
100
        }
×
101

102
        if err = s.grpc.Serve(s.l); err != nil {
1✔
103
                return fmt.Errorf("serve: %w", err)
×
104
        }
×
105

106
        return nil
1✔
107
}
108

109
// Close stops the server.
110
func (s *Server) Close() { s.grpc.GracefulStop() }
1✔
111

112
type contextKey string
113

114
var (
115
        ctxMatch     = contextKey("match")
116
        ctxFirstRecv = contextKey("first_recv")
117
)
118

119
func (s *Server) matchMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
120
        return func(srv any, stream grpc.ServerStream) error {
8✔
121
                ctx := stream.Context()
7✔
122

7✔
123
                mtd, ok := grpc.Method(ctx)
7✔
124
                if !ok {
7✔
125
                        slog.WarnContext(ctx, "failed to get method from context")
×
126
                        return status.Error(codes.Internal, "{groxy} failed to get method from the context")
×
127
                }
×
128

129
                md, ok := metadata.FromIncomingContext(ctx)
7✔
130
                if !ok {
7✔
131
                        md = metadata.New(nil)
×
132
                }
×
133

134
                matches := s.matcher.MatchMetadata(mtd, md)
7✔
135
                if len(matches) == 0 {
7✔
136
                        return next(srv, stream)
×
137
                }
×
138

139
                slog.DebugContext(ctx, "found matches", slog.Any("matches", matches))
7✔
140

7✔
141
                match := matches[0]
7✔
142
                if !matches.NeedsDeeperMatch() {
7✔
143
                        slog.DebugContext(ctx, "matched", slog.Any("match", match))
×
144
                        ctx = context.WithValue(ctx, ctxMatch, match)
×
145
                        return next(srv, grpcx.StreamWithContext(ctx, stream))
×
146
                }
×
147

148
                var firstRecv []byte
7✔
149
                if err := stream.RecvMsg(&firstRecv); err != nil {
7✔
150
                        slog.WarnContext(ctx, "failed to read the first RECV", slogx.Error(err))
×
151
                        return status.Errorf(codes.Internal, "{groxy} failed to read the first RECV: %v", err)
×
152
                }
×
153

154
                ctx = context.WithValue(ctx, ctxFirstRecv, firstRecv)
7✔
155
                if match, ok = matches.MatchMessage(ctx, firstRecv); !ok {
7✔
156
                        return next(srv, stream)
×
157
                }
×
158

159
                slog.DebugContext(ctx, "matched", slog.Any("match", match))
7✔
160
                ctx = context.WithValue(ctx, ctxMatch, match)
7✔
161
                return next(srv, grpcx.StreamWithContext(ctx, stream))
7✔
162
        }
163
}
164

165
func (s *Server) mockMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
166
        return func(srv any, stream grpc.ServerStream) error {
8✔
167
                ctx := stream.Context()
7✔
168

7✔
169
                match, ok := ctx.Value(ctxMatch).(*discovery.Rule)
7✔
170
                if !ok {
7✔
171
                        return next(srv, stream)
×
172
                }
×
173

174
                if match.Mock == nil {
11✔
175
                        return next(srv, stream)
4✔
176
                }
4✔
177

178
                if match.Mock.Wait > 0 {
3✔
179
                        slog.DebugContext(ctx, "waiting before responding", slog.Any("wait", match.Mock.Wait))
×
180
                        select {
×
181
                        case <-ctx.Done():
×
182
                                slog.WarnContext(ctx, "context done while waiting",
×
183
                                        slog.Any("wait", match.Mock.Wait),
×
184
                                        slogx.Error(ctx.Err()))
×
185
                                return status.Error(codes.Canceled, "{groxy} context done while waiting")
×
186
                        case <-time.After(match.Mock.Wait):
×
187
                        }
188
                }
189

190
                if len(match.Mock.Header) > 0 {
4✔
191
                        if err := stream.SetHeader(match.Mock.Header); err != nil {
1✔
192
                                slog.WarnContext(ctx, "failed to set header to the client", slogx.Error(err))
×
193
                        }
×
194
                }
195

196
                if len(match.Mock.Trailer) > 0 {
3✔
197
                        stream.SetTrailer(match.Mock.Trailer)
×
198
                }
×
199

200
                switch {
3✔
201
                case match.Mock.Body != nil:
1✔
202
                        var data map[string]any
1✔
203

1✔
204
                        firstRecv := ctx.Value(ctxFirstRecv)
1✔
205
                        if firstRecv != nil && match.Match.Message != nil {
2✔
206
                                dm, err := match.Match.Message.DataMap(ctx, firstRecv.([]byte))
1✔
207
                                if err != nil {
1✔
208
                                        slog.WarnContext(ctx, "failed to extract data from the first message", slogx.Error(err))
×
209
                                        return status.Errorf(codes.Internal, "{groxy} failed to extract data from the first message: %v", err)
×
210
                                }
×
211
                                data = dm
1✔
212
                        }
213

214
                        msg, err := match.Mock.Body.Generate(ctx, data)
1✔
215
                        if err != nil {
1✔
216
                                slog.WarnContext(ctx, "failed to generate mock body", slogx.Error(err))
×
217
                                return status.Errorf(codes.Internal, "{groxy} failed to generate mock body: %v", err)
×
218
                        }
×
219

220
                        if err = stream.SendMsg(msg); err != nil {
1✔
221
                                return status.Errorf(codes.Internal, "{groxy} failed to send message: %v", err)
×
222
                        }
×
223
                case match.Mock.Status != nil:
2✔
224
                        return match.Mock.Status.Err()
2✔
225
                default:
×
226
                        return status.Error(codes.Internal, "{groxy} empty mock")
×
227
                }
228

229
                // dump the rest of the stream
230
                for {
2✔
231
                        if err := stream.RecvMsg(nil); err != nil {
2✔
232
                                if errors.Is(err, io.EOF) {
2✔
233
                                        return nil
1✔
234
                                }
1✔
235

236
                                return status.Errorf(codes.Internal, "{groxy} failed to read the rest of the stream: %v", err)
×
237
                        }
238
                }
239
        }
240
}
241

242
func (s *Server) forwardMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
243
        return func(_ any, stream grpc.ServerStream) error {
5✔
244
                ctx := stream.Context()
4✔
245

4✔
246
                match, ok := ctx.Value(ctxMatch).(*discovery.Rule)
4✔
247
                if !ok || match.Forward == nil {
4✔
248
                        return next(nil, stream)
×
249
                }
×
250

251
                ctx = plantHeader(ctx, match.Forward.Header)
4✔
252

4✔
253
                mtd, _ := grpc.Method(ctx)
4✔
254
                desc := &grpc.StreamDesc{ClientStreams: true, ServerStreams: true}
4✔
255

4✔
256
                upstreamHeader, upstreamTrailer := metadata.New(nil), metadata.New(nil)
4✔
257

4✔
258
                if match.Forward.Rewrite != "" {
4✔
NEW
259
                        mtd = match.Match.URI.ReplaceAllString(mtd, match.Forward.Rewrite)
×
NEW
260
                }
×
261

262
                upstream, err := match.Forward.Upstream.NewStream(ctx, desc, mtd,
4✔
263
                        grpc.ForceCodec(grpcx.RawBytesCodec{}),
4✔
264
                        grpc.Header(&upstreamHeader),
4✔
265
                        grpc.Trailer(&upstreamTrailer))
4✔
266
                if err != nil {
4✔
267
                        return status.Errorf(codes.Internal, "{groxy} failed to create upstream: %v", err)
×
268
                }
×
269

270
                if firstRecv, _ := ctx.Value(ctxFirstRecv).([]byte); firstRecv != nil {
8✔
271
                        if err = upstream.SendMsg(firstRecv); err != nil {
4✔
272
                                return status.Errorf(codes.Internal,
×
273
                                        "{groxy} failed to send the first message to the upstream: %v", err)
×
274
                        }
×
275
                }
276

277
                defer func() {
8✔
278
                        stream.SetTrailer(metadata.Join(upstreamHeader, upstreamTrailer))
4✔
279

4✔
280
                        if err = upstream.CloseSend(); err != nil {
4✔
281
                                slog.WarnContext(ctx, "failed to close the upstream",
×
282
                                        slog.String("upstream_name", match.Forward.Upstream.Name()),
×
283
                                        slogx.Error(err))
×
284
                        }
×
285
                }()
286

287
                if err = grpcx.Pipe(upstream, stream); err != nil {
8✔
288
                        if errors.Is(err, io.EOF) {
7✔
289
                                return eofStatus(upstream)
3✔
290
                        }
3✔
291
                        if st := grpcx.StatusFromError(err); st != nil {
2✔
292
                                return st.Err()
1✔
293
                        }
1✔
294
                        slog.WarnContext(ctx, "failed to pipe",
×
295
                                slog.String("upstream_name", match.Forward.Upstream.Name()),
×
296
                                slogx.Error(err))
×
297
                        return status.Errorf(codes.Internal, "{groxy} failed to pipe messages to the upstream")
×
298
                }
299

300
                return nil
×
301
        }
302
}
303

304
func eofStatus(upstream grpc.ClientStream) (err error) {
3✔
305
        if err = upstream.RecvMsg(nil); err == nil {
3✔
306
                return status.Error(codes.Internal, "{groxy} unexpected EOF from the upstream")
×
307
        }
×
308
        if st := grpcx.StatusFromError(err); st != nil {
3✔
309
                return st.Err()
×
310
        }
×
311
        if !errors.Is(err, io.EOF) {
3✔
312
                return status.Errorf(codes.Internal, "{groxy} failed to read the EOF from the upstream: %v", err)
×
313
        }
×
314
        return nil // if there is just EOF then probably everything is fine
3✔
315
}
316

317
func plantHeader(ctx context.Context, header metadata.MD) context.Context {
4✔
318
        outMD, ok := metadata.FromOutgoingContext(ctx)
4✔
319
        if !ok {
4✔
320
                outMD = metadata.New(nil)
×
321
        }
×
322

323
        for k, v := range header {
4✔
324
                if _, ok = outMD[k]; !ok {
×
325
                        outMD[k] = v
×
326
                }
×
327
        }
328

329
        return metadata.NewOutgoingContext(ctx, outMD)
4✔
330
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc