• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Semior001 / groxy / 15993267150

01 Jul 2025 07:52AM UTC coverage: 45.86% (+0.2%) from 45.688%
15993267150

Pull #13

github

Semior001
feat: added uuid as a default function
Pull Request #13: add templating

37 of 49 new or added lines in 3 files covered. (75.51%)

91 existing lines in 5 files now uncovered.

997 of 2174 relevant lines covered (45.86%)

18.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.57
/pkg/proxy/proxy.go
1
// Package proxy provides server and codec for proxying gRPC requests.
2
package proxy
3

4
import (
5
        "errors"
6
        "fmt"
7
        "io"
8
        "log/slog"
9
        "net"
10

11
        "github.com/Semior001/groxy/pkg/discovery"
12
        "github.com/Semior001/groxy/pkg/proxy/middleware"
13
        "github.com/cappuccinotm/slogx"
14
        "google.golang.org/grpc"
15
        "google.golang.org/grpc/codes"
16
        "google.golang.org/grpc/metadata"
17
        "google.golang.org/grpc/status"
18
        "github.com/Semior001/groxy/pkg/grpcx"
19
        "context"
20
        "google.golang.org/grpc/health"
21
        healthpb "google.golang.org/grpc/health/grpc_health_v1"
22
)
23

24
//go:generate moq -out mocks/mocks.go --skip-ensure -pkg mocks . Matcher ServerStream
25

26
// ServerStream is a gRPC server stream.
27
type ServerStream grpc.ServerStream
28

29
// Matcher matches the request URI and incoming metadata to the
30
// registered rules.
31
type Matcher interface {
32
        MatchMetadata(string, metadata.MD) discovery.Matches // returns matches based on the method and metadata.
33
        Upstreams() []discovery.Upstream                     // returns all upstreams registered in the matcher
34
}
35

36
// Server is a gRPC server.
37
type Server struct {
38
        version string
39

40
        serverOpts []grpc.ServerOption
41
        matcher    Matcher
42

43
        signature  bool
44
        reflection bool
45
        debug      bool
46
        l          net.Listener
47
        grpc       *grpc.Server
48
}
49

50
// NewServer creates a new server.
51
func NewServer(m Matcher, opts ...Option) *Server {
1✔
52
        s := &Server{
1✔
53
                matcher:   m,
1✔
54
                signature: false,
1✔
55
        }
1✔
56

1✔
57
        for _, opt := range opts {
3✔
58
                opt(s)
2✔
59
        }
2✔
60

61
        return s
1✔
62
}
63

64
// Listen starts the server on the given address.
65
// Blocking call.
66
func (s *Server) Listen(addr string) (err error) {
1✔
67
        slog.Info("starting gRPC server", slog.Any("addr", addr))
1✔
68
        defer slog.Warn("gRPC server stopped", slogx.Error(err))
1✔
69

1✔
70
        healthHandler := health.NewServer()
1✔
71
        healthHandler.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
1✔
72

1✔
73
        noMatchHandler := func(any, grpc.ServerStream) error {
1✔
UNCOV
74
                return status.Error(codes.Internal, "{groxy} didn't match request to any rule")
×
75
        }
×
76

77
        s.grpc = grpc.NewServer(append(s.serverOpts,
1✔
78
                grpc.ForceServerCodec(grpcx.RawBytesCodec{}),
1✔
79
                grpc.UnknownServiceHandler(middleware.Wrap(noMatchHandler,
1✔
80
                        middleware.Recoverer("{groxy} panic"),
1✔
81
                        middleware.Maybe(s.signature, middleware.AppInfo("groxy", "Semior001", s.version)),
1✔
82
                        middleware.Log(s.debug, "/grpc.reflection."),
1✔
83
                        middleware.PassMetadata(),
1✔
84
                        middleware.Health(healthHandler),
1✔
85
                        middleware.Maybe(s.reflection, middleware.Chain(
1✔
86
                                middleware.Reflector{
1✔
87
                                        Logger:        slog.Default().With(slog.String("subsystem", "reflection")),
1✔
88
                                        UpstreamsFunc: s.matcher.Upstreams,
1✔
89
                                }.Middleware,
1✔
90
                        )),
1✔
91
                        s.matchMiddleware,
1✔
92
                        s.mockMiddleware, s.forwardMiddleware,
1✔
93
                )),
1✔
94
        )...)
1✔
95

1✔
96
        if s.l, err = net.Listen("tcp", addr); err != nil {
1✔
UNCOV
97
                return fmt.Errorf("register listener: %w", err)
×
98
        }
×
99

100
        if err = s.grpc.Serve(s.l); err != nil {
1✔
UNCOV
101
                return fmt.Errorf("serve: %w", err)
×
102
        }
×
103

104
        return nil
1✔
105
}
106

107
// Close stops the server.
108
func (s *Server) Close() { s.grpc.GracefulStop() }
1✔
109

110
type contextKey string
111

112
var (
113
        ctxMatch     = contextKey("match")
114
        ctxFirstRecv = contextKey("first_recv")
115
)
116

117
func (s *Server) matchMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
118
        return func(srv any, stream grpc.ServerStream) error {
8✔
119
                ctx := stream.Context()
7✔
120

7✔
121
                mtd, ok := grpc.Method(ctx)
7✔
122
                if !ok {
7✔
UNCOV
123
                        slog.WarnContext(ctx, "failed to get method from context")
×
124
                        return status.Error(codes.Internal, "{groxy} failed to get method from the context")
×
125
                }
×
126

127
                md, ok := metadata.FromIncomingContext(ctx)
7✔
128
                if !ok {
7✔
UNCOV
129
                        md = metadata.New(nil)
×
130
                }
×
131

132
                matches := s.matcher.MatchMetadata(mtd, md)
7✔
133
                if len(matches) == 0 {
7✔
UNCOV
134
                        return next(srv, stream)
×
135
                }
×
136

137
                slog.DebugContext(ctx, "found matches", slog.Any("matches", matches))
7✔
138

7✔
139
                match := matches[0]
7✔
140
                if !matches.NeedsDeeperMatch() {
7✔
UNCOV
141
                        slog.DebugContext(ctx, "matched", slog.Any("match", match))
×
142
                        ctx = context.WithValue(ctx, ctxMatch, match)
×
143
                        return next(srv, grpcx.StreamWithContext(ctx, stream))
×
144
                }
×
145

146
                var firstRecv []byte
7✔
147
                if err := stream.RecvMsg(&firstRecv); err != nil {
7✔
UNCOV
148
                        slog.WarnContext(ctx, "failed to read the first RECV", slogx.Error(err))
×
149
                        return status.Errorf(codes.Internal, "{groxy} failed to read the first RECV: %v", err)
×
150
                }
×
151

152
                ctx = context.WithValue(ctx, ctxFirstRecv, firstRecv)
7✔
153
                if match, ok = matches.MatchMessage(firstRecv); !ok {
7✔
154
                        return next(srv, stream)
×
155
                }
×
156

157
                slog.DebugContext(ctx, "matched", slog.Any("match", match))
7✔
158
                ctx = context.WithValue(ctx, ctxMatch, match)
7✔
159
                return next(srv, grpcx.StreamWithContext(ctx, stream))
7✔
160
        }
161
}
162

163
func (s *Server) mockMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
164
        return func(srv any, stream grpc.ServerStream) error {
8✔
165
                ctx := stream.Context()
7✔
166

7✔
167
                match, ok := ctx.Value(ctxMatch).(*discovery.Rule)
7✔
168
                if !ok {
7✔
UNCOV
169
                        return next(srv, stream)
×
170
                }
×
171

172
                if match.Mock == nil {
11✔
173
                        return next(srv, stream)
4✔
174
                }
4✔
175

176
                if len(match.Mock.Header) > 0 {
4✔
177
                        if err := stream.SetHeader(match.Mock.Header); err != nil {
1✔
UNCOV
178
                                slog.WarnContext(ctx, "failed to set header to the client", slogx.Error(err))
×
179
                        }
×
180
                }
181

182
                if len(match.Mock.Trailer) > 0 {
3✔
UNCOV
183
                        stream.SetTrailer(match.Mock.Trailer)
×
184
                }
×
185

186
                switch {
3✔
187
                case match.Mock.Body != nil:
1✔
188
                        if err := stream.SendMsg(match.Mock.Body); err != nil {
1✔
189
                                return status.Errorf(codes.Internal, "{groxy} failed to send message: %v", err)
×
190
                        }
×
191
                case match.Mock.Status != nil:
2✔
192
                        return match.Mock.Status.Err()
2✔
193
                default:
×
194
                        return status.Error(codes.Internal, "{groxy} empty mock")
×
195
                }
196

197
                // dump the rest of the stream
198
                for {
2✔
199
                        if err := stream.RecvMsg(nil); err != nil {
2✔
200
                                if errors.Is(err, io.EOF) {
2✔
201
                                        return nil
1✔
202
                                }
1✔
203

UNCOV
204
                                return status.Errorf(codes.Internal, "{groxy} failed to read the rest of the stream: %v", err)
×
205
                        }
206
                }
207
        }
208
}
209

210
func (s *Server) forwardMiddleware(next grpc.StreamHandler) grpc.StreamHandler {
1✔
211
        return func(_ any, stream grpc.ServerStream) error {
5✔
212
                ctx := stream.Context()
4✔
213

4✔
214
                match, ok := ctx.Value(ctxMatch).(*discovery.Rule)
4✔
215
                if !ok || match.Forward == nil {
4✔
UNCOV
216
                        return next(nil, stream)
×
UNCOV
217
                }
×
218

219
                ctx = plantHeader(ctx, match.Forward.Header)
4✔
220

4✔
221
                mtd, _ := grpc.Method(ctx)
4✔
222
                desc := &grpc.StreamDesc{ClientStreams: true, ServerStreams: true}
4✔
223

4✔
224
                upstreamHeader, upstreamTrailer := metadata.New(nil), metadata.New(nil)
4✔
225

4✔
226
                upstream, err := match.Forward.Upstream.NewStream(ctx, desc, mtd,
4✔
227
                        grpc.ForceCodec(grpcx.RawBytesCodec{}),
4✔
228
                        grpc.Header(&upstreamHeader),
4✔
229
                        grpc.Trailer(&upstreamTrailer))
4✔
230
                if err != nil {
4✔
UNCOV
231
                        return status.Errorf(codes.Internal, "{groxy} failed to create upstream: %v", err)
×
UNCOV
232
                }
×
233

234
                if firstRecv, _ := ctx.Value(ctxFirstRecv).([]byte); firstRecv != nil {
8✔
235
                        if err = upstream.SendMsg(firstRecv); err != nil {
4✔
UNCOV
236
                                return status.Errorf(codes.Internal,
×
UNCOV
237
                                        "{groxy} failed to send the first message to the upstream: %v", err)
×
238
                        }
×
239
                }
240

241
                defer func() {
8✔
242
                        stream.SetTrailer(metadata.Join(upstreamHeader, upstreamTrailer))
4✔
243

4✔
244
                        if err = upstream.CloseSend(); err != nil {
4✔
245
                                slog.WarnContext(ctx, "failed to close the upstream",
×
UNCOV
246
                                        slog.String("upstream_name", match.Forward.Upstream.Name()),
×
UNCOV
247
                                        slogx.Error(err))
×
UNCOV
248
                        }
×
249
                }()
250

251
                if err = grpcx.Pipe(upstream, stream); err != nil {
8✔
252
                        if errors.Is(err, io.EOF) {
7✔
253
                                return eofStatus(upstream)
3✔
254
                        }
3✔
255
                        if st := grpcx.StatusFromError(err); st != nil {
2✔
256
                                return st.Err()
1✔
257
                        }
1✔
UNCOV
258
                        slog.WarnContext(ctx, "failed to pipe",
×
UNCOV
259
                                slog.String("upstream_name", match.Forward.Upstream.Name()),
×
UNCOV
260
                                slogx.Error(err))
×
UNCOV
261
                        return status.Errorf(codes.Internal, "{groxy} failed to pipe messages to the upstream")
×
262
                }
263

UNCOV
264
                return nil
×
265
        }
266
}
267

268
func eofStatus(upstream grpc.ClientStream) (err error) {
3✔
269
        if err = upstream.RecvMsg(nil); err == nil {
3✔
UNCOV
270
                return status.Error(codes.Internal, "{groxy} unexpected EOF from the upstream")
×
271
        }
×
272
        if st := grpcx.StatusFromError(err); st != nil {
3✔
UNCOV
273
                return st.Err()
×
UNCOV
274
        }
×
275
        if !errors.Is(err, io.EOF) {
3✔
UNCOV
276
                return status.Errorf(codes.Internal, "{groxy} failed to read the EOF from the upstream: %v", err)
×
277
        }
×
278
        return nil // if there is just EOF then probably everything is fine
3✔
279
}
280

281
func plantHeader(ctx context.Context, header metadata.MD) context.Context {
4✔
282
        outMD, ok := metadata.FromOutgoingContext(ctx)
4✔
283
        if !ok {
4✔
284
                outMD = metadata.New(nil)
×
UNCOV
285
        }
×
286

287
        for k, v := range header {
4✔
UNCOV
288
                if _, ok = outMD[k]; !ok {
×
UNCOV
289
                        outMD[k] = v
×
UNCOV
290
                }
×
291
        }
292

293
        return metadata.NewOutgoingContext(ctx, outMD)
4✔
294
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc