• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Semior001 / groxy / 17194096565

24 Aug 2025 09:37PM UTC coverage: 47.481% (-0.3%) from 47.826%
17194096565

Pull #20

github

Semior001
chore: bumped go version for golangci-lint tool
Pull Request #20: feat: add wait instruction for respond mocker and reorganize example protobuf files

4 of 14 new or added lines in 2 files covered. (28.57%)

10 existing lines in 2 files now uncovered.

1131 of 2382 relevant lines covered (47.48%)

33.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.45
/pkg/proxy/middleware/reflection.go
1
// Package middleware contains middlewares for gRPC unknown handlers.
2
//nolint:staticcheck // this file uses deprecated reflection API
3
package middleware
4

5
import (
6
        "github.com/Semior001/groxy/pkg/discovery"
7
        "google.golang.org/grpc"
8
        "strings"
9
        "fmt"
10
        rapi1alpha "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
11
        rapi1 "google.golang.org/grpc/reflection/grpc_reflection_v1"
12
        "errors"
13
        "io"
14
        "github.com/cappuccinotm/slogx"
15
        "log/slog"
16
        "context"
17
        "golang.org/x/sync/errgroup"
18
        "google.golang.org/grpc/codes"
19
        "github.com/samber/lo"
20
        "google.golang.org/grpc/status"
21
        "sort"
22
        "github.com/Semior001/groxy/pkg/grpcx"
23
)
24

25
// Reflector serves the reflection across multiple upstreams,
26
// by upstreaming the first request to each one and finding the
27
// one that doesn't respond a NotFound status and then piping
28
// the response back to the client.
29
type Reflector struct {
30
        Logger        *slog.Logger
31
        UpstreamsFunc func() []discovery.Upstream
32
}
33

34
// SRIClient is a shorthand for a server reflection stream client.
35
type SRIClient struct {
36
        Name string
37
        rapi1.ServerReflection_ServerReflectionInfoClient
38
}
39

40
// Middleware returns a middleware that reflects the request to the upstreams.
41
//goland:noinspection GoDeprecation
42
func (r Reflector) Middleware(next grpc.StreamHandler) grpc.StreamHandler {
2✔
43
        if r.Logger == nil {
4✔
44
                r.Logger = slog.Default()
2✔
45
        }
2✔
46

47
        return func(srv any, clientStream grpc.ServerStream) error {
5✔
48
                ctx := clientStream.Context()
3✔
49

3✔
50
                method, ok := grpc.Method(ctx)
3✔
51
                if !ok || !strings.HasPrefix(method, "/grpc.reflection.v1") {
3✔
52
                        return next(srv, clientStream)
×
53
                }
×
54

55
                alpha := strings.HasPrefix(method, "/grpc.reflection.v1alpha")
3✔
56

3✔
57
                upstreams := r.UpstreamsFunc()
3✔
58
                for idx, up := range upstreams { // skip the reflection-disabled upstreams
8✔
59
                        if !up.Reflection() {
5✔
60
                                upstreams = append(upstreams[:idx], upstreams[idx+1:]...)
×
61
                        }
×
62
                }
63

64
                clients := make([]SRIClient, len(upstreams))
3✔
65
                defer func() {
6✔
66
                        for _, client := range clients {
8✔
67
                                if client.ServerReflection_ServerReflectionInfoClient == nil {
5✔
68
                                        continue
×
69
                                }
70

71
                                if err := client.CloseSend(); err != nil {
5✔
72
                                        r.Logger.WarnContext(ctx, "failed to close the stream to upstream",
×
73
                                                slog.String("upstream", client.Name),
×
74
                                                slogx.Error(err))
×
75
                                        continue
×
76
                                }
77

78
                                r.Logger.DebugContext(ctx, "closed the stream to upstream",
5✔
79
                                        slog.String("upstream", client.Name))
5✔
80
                        }
81
                }()
82
                for idx, upstream := range upstreams {
8✔
83
                        cl, err := rapi1.NewServerReflectionClient(upstream).ServerReflectionInfo(ctx)
5✔
84
                        if err != nil {
5✔
85
                                r.Logger.WarnContext(ctx, "failed to make a new stream to upstream",
×
86
                                        slog.String("upstream", upstream.Name()),
×
87
                                        slog.String("target", upstream.Target()),
×
88
                                        slogx.Error(err))
×
89
                                return status.Errorf(codes.Internal, "{groxy} can't make a new stream to upstream %s",
×
90
                                        upstream.Name())
×
91
                        }
×
92

93
                        clients[idx] = SRIClient{
5✔
94
                                Name: upstream.Name(),
5✔
95
                                ServerReflection_ServerReflectionInfoClient: cl,
5✔
96
                        }
5✔
97

5✔
98
                        r.Logger.DebugContext(ctx, "brought up a new stream to upstream",
5✔
99
                                slog.String("upstream", upstream.Name()),
5✔
100
                                slog.String("target", upstream.Target()))
5✔
101
                }
102

103
                for {
10✔
104
                        recv := any(&rapi1.ServerReflectionRequest{})
7✔
105
                        if alpha {
10✔
106
                                recv = &rapi1alpha.ServerReflectionRequest{}
3✔
107
                        }
3✔
108

109
                        if err := clientStream.RecvMsg(recv); err != nil {
9✔
110
                                if errors.Is(err, io.EOF) {
2✔
111
                                        return nil
×
112
                                }
×
113

114
                                r.Logger.WarnContext(ctx, "failed to receive message", slogx.Error(err))
2✔
115
                                return status.Error(codes.Internal, "{groxy} failed to receive message")
2✔
116
                        }
117

118
                        resp, err := r.reflect(ctx, r.asV1Request(recv), clients)
5✔
119
                        if err != nil {
6✔
120
                                if st := grpcx.StatusFromError(err); st != nil && grpcx.ClientCode(st.Code()) {
2✔
121
                                        return status.Errorf(st.Code(), "{groxy} received from one of upstreams: %s", st.Message())
1✔
122
                                }
1✔
123
                                r.Logger.WarnContext(ctx, "failed to reflect", slogx.Error(err))
×
124
                                return status.Error(codes.Internal, "{groxy} failed to reflect")
×
125
                        }
126

127
                        result := any(resp)
4✔
128
                        if alpha {
6✔
129
                                result = r.asV1AlphaResponse(recv, resp)
2✔
130
                        }
2✔
131

132
                        if err = clientStream.SendMsg(result); err != nil {
4✔
133
                                r.Logger.WarnContext(ctx, "failed to send message", slogx.Error(err))
×
134
                                return status.Error(codes.Internal, "{groxy} failed to send message")
×
135
                        }
×
136
                }
137
        }
138
}
139

140
func (r Reflector) reflect(
141
        ctx context.Context,
142
        req *rapi1.ServerReflectionRequest,
143
        ups []SRIClient,
144
) (*rapi1.ServerReflectionResponse, error) {
5✔
145
        resps := make([]*rapi1.ServerReflectionResponse, len(ups))
5✔
146
        ewg, ctx := errgroup.WithContext(ctx)
5✔
147
        for idx, up := range ups {
14✔
148
                idx, up := idx, up
9✔
149
                ewg.Go(func() error {
18✔
150
                        if err := up.Send(req); err != nil {
9✔
UNCOV
151
                                if errors.Is(err, io.EOF) {
×
UNCOV
152
                                        _, rerr := up.Recv()
×
UNCOV
153
                                        err = errors.Join(err, fmt.Errorf("recv after EOF: %w", rerr))
×
UNCOV
154
                                }
×
UNCOV
155
                                r.Logger.WarnContext(ctx, "failed to send reflection request",
×
UNCOV
156
                                        slog.String("upstream", up.Name),
×
UNCOV
157
                                        slogx.Error(err),
×
UNCOV
158
                                        slog.Any("trailers", up.Trailer()))
×
UNCOV
159
                                return fmt.Errorf("send request to upstream: %w", err)
×
160
                        }
161

162
                        resp, err := up.Recv()
9✔
163
                        if err != nil {
10✔
164
                                r.Logger.WarnContext(ctx, "failed to send reflection request",
1✔
165
                                        slog.String("upstream", up.Name),
1✔
166
                                        slogx.Error(err))
1✔
167
                                return fmt.Errorf("receive from upstream: %w", err)
1✔
168
                        }
1✔
169

170
                        if eresp := resp.GetErrorResponse(); eresp != nil && eresp.ErrorCode != int32(codes.NotFound) {
8✔
171
                                r.Logger.WarnContext(ctx, "received error response from upstream",
×
172
                                        slog.String("upstream", up.Name),
×
173
                                        slog.String("error_message", eresp.ErrorMessage),
×
174
                                        slog.Int("error_code", int(eresp.ErrorCode)))
×
175
                                return fmt.Errorf("error response from upstream: %s", eresp.ErrorMessage)
×
176
                        }
×
177

178
                        resps[idx] = resp
8✔
179
                        return nil
8✔
180
                })
181
        }
182
        if err := ewg.Wait(); err != nil {
6✔
183
                return nil, fmt.Errorf("reflect: %w", err)
1✔
184
        }
1✔
185

186
        return r.mergeResponses(ctx, req, resps)
4✔
187
}
188

189
func (r Reflector) mergeResponses(
190
        ctx context.Context,
191
        req *rapi1.ServerReflectionRequest,
192
        resps []*rapi1.ServerReflectionResponse,
193
) (*rapi1.ServerReflectionResponse, error) {
4✔
194
        result := &rapi1.ServerReflectionResponse{OriginalRequest: req}
4✔
195
        switch req.MessageRequest.(type) {
4✔
196
        case *rapi1.ServerReflectionRequest_FileByFilename,
197
                *rapi1.ServerReflectionRequest_FileContainingExtension,
198
                *rapi1.ServerReflectionRequest_FileContainingSymbol:
2✔
199
                r.mergeDescriptorResponses(ctx, resps, result)
2✔
200
        case *rapi1.ServerReflectionRequest_ListServices:
2✔
201
                r.mergeServiceResponses(ctx, resps, result)
2✔
202
        case *rapi1.ServerReflectionRequest_AllExtensionNumbersOfType:
×
203
                r.respondAllExtensionNumbersOfType(resps, result)
×
204
        default:
×
205
                return nil, fmt.Errorf("unexpected message request: %T", req.MessageRequest)
×
206
        }
207

208
        if result.MessageResponse == nil {
4✔
209
                return &rapi1.ServerReflectionResponse{
×
210
                        MessageResponse: &rapi1.ServerReflectionResponse_ErrorResponse{
×
211
                                ErrorResponse: &rapi1.ErrorResponse{
×
212
                                        ErrorCode:    int32(codes.NotFound),
×
213
                                        ErrorMessage: "{groxy} didn't find any response among the upstreams",
×
214
                                },
×
215
                        },
×
216
                }, nil
×
217
        }
×
218

219
        return result, nil
4✔
220
}
221

222
func (r Reflector) mergeDescriptorResponses(
223
        ctx context.Context,
224
        resps []*rapi1.ServerReflectionResponse,
225
        resp *rapi1.ServerReflectionResponse,
226
) {
2✔
227
        result := &rapi1.ServerReflectionResponse_FileDescriptorResponse{
2✔
228
                FileDescriptorResponse: &rapi1.FileDescriptorResponse{},
2✔
229
        }
2✔
230

2✔
231
        for _, resp := range resps {
6✔
232
                if resp == nil {
4✔
233
                        continue
×
234
                }
235

236
                if eresp := resp.GetErrorResponse(); eresp != nil {
4✔
237
                        continue
×
238
                }
239

240
                fdresp := resp.GetFileDescriptorResponse()
4✔
241
                if fdresp == nil {
4✔
242
                        r.Logger.WarnContext(ctx, "unexpected response type",
×
243
                                slog.String("response_type", fmt.Sprintf("%T", resp.MessageResponse)))
×
244
                        continue
×
245
                }
246

247
                result.FileDescriptorResponse.FileDescriptorProto = append(result.FileDescriptorResponse.FileDescriptorProto,
4✔
248
                        fdresp.FileDescriptorProto...)
4✔
249
        }
250

251
        if len(result.FileDescriptorResponse.FileDescriptorProto) == 0 {
2✔
252
                return
×
253
        }
×
254

255
        resp.MessageResponse = result
2✔
256
}
257

258
func (r Reflector) mergeServiceResponses(
259
        ctx context.Context,
260
        resps []*rapi1.ServerReflectionResponse,
261
        result *rapi1.ServerReflectionResponse,
262
) {
2✔
263
        services := map[string]struct{}{}
2✔
264

2✔
265
        for _, resp := range resps {
6✔
266
                if eresp := resp.GetErrorResponse(); eresp != nil {
4✔
267
                        continue
×
268
                }
269

270
                sresp := resp.GetListServicesResponse()
4✔
271
                if sresp == nil {
4✔
272
                        r.Logger.WarnContext(ctx, "unexpected response type",
×
273
                                slog.String("response_type", fmt.Sprintf("%T", resp.MessageResponse)))
×
274
                        continue
×
275
                }
276

277
                for _, service := range sresp.Service {
16✔
278
                        if _, ok := services[service.Name]; ok {
16✔
279
                                r.Logger.WarnContext(ctx, "duplicate service reflected",
4✔
280
                                        slog.String("service", service.Name))
4✔
281
                        }
4✔
282
                        services[service.Name] = struct{}{}
12✔
283
                }
284
        }
285

286
        if len(services) == 0 {
2✔
287
                return
×
288
        }
×
289

290
        resp := &rapi1.ServerReflectionResponse_ListServicesResponse{
2✔
291
                ListServicesResponse: &rapi1.ListServiceResponse{
2✔
292
                        Service: lo.Map(lo.Keys(services), func(service string, _ int) *rapi1.ServiceResponse {
10✔
293
                                return &rapi1.ServiceResponse{Name: service}
8✔
294
                        }),
8✔
295
                },
296
        }
297

298
        sort.Slice(resp.ListServicesResponse.Service, func(i, j int) bool {
12✔
299
                return resp.ListServicesResponse.Service[i].Name < resp.ListServicesResponse.Service[j].Name
10✔
300
        })
10✔
301

302
        result.MessageResponse = resp
2✔
303
}
304

305
// respondAllExtensionNumbersOfType returns the first non-error response.
306
func (r Reflector) respondAllExtensionNumbersOfType(
307
        resps []*rapi1.ServerReflectionResponse,
308
        result *rapi1.ServerReflectionResponse,
309
) {
×
310
        for _, resp := range resps {
×
311
                if eresp := resp.GetErrorResponse(); eresp != nil {
×
312
                        continue
×
313
                }
314

315
                result.MessageResponse = resp.MessageResponse
×
316
                return
×
317
        }
318
}
319

320
//goland:noinspection GoDeprecation
321
func (r Reflector) asV1Request(recv any) *rapi1.ServerReflectionRequest {
5✔
322
        msg, ok := recv.(*rapi1alpha.ServerReflectionRequest)
5✔
323
        if !ok {
8✔
324
                return recv.(*rapi1.ServerReflectionRequest)
3✔
325
        }
3✔
326

327
        result := &rapi1.ServerReflectionRequest{Host: msg.Host}
2✔
328

2✔
329
        switch req := msg.MessageRequest.(type) {
2✔
330
        case *rapi1alpha.ServerReflectionRequest_FileByFilename:
×
331
                result.MessageRequest = &rapi1.ServerReflectionRequest_FileByFilename{
×
332
                        FileByFilename: req.FileByFilename,
×
333
                }
×
334
        case *rapi1alpha.ServerReflectionRequest_FileContainingSymbol:
1✔
335
                result.MessageRequest = &rapi1.ServerReflectionRequest_FileContainingSymbol{
1✔
336
                        FileContainingSymbol: req.FileContainingSymbol,
1✔
337
                }
1✔
338
        case *rapi1alpha.ServerReflectionRequest_FileContainingExtension:
×
339
                result.MessageRequest = &rapi1.ServerReflectionRequest_FileContainingExtension{
×
340
                        FileContainingExtension: &rapi1.ExtensionRequest{
×
341
                                ContainingType:  req.FileContainingExtension.ContainingType,
×
342
                                ExtensionNumber: req.FileContainingExtension.ExtensionNumber,
×
343
                        },
×
344
                }
×
345
        case *rapi1alpha.ServerReflectionRequest_AllExtensionNumbersOfType:
×
346
                result.MessageRequest = &rapi1.ServerReflectionRequest_AllExtensionNumbersOfType{
×
347
                        AllExtensionNumbersOfType: req.AllExtensionNumbersOfType,
×
348
                }
×
349
        case *rapi1alpha.ServerReflectionRequest_ListServices:
1✔
350
                result.MessageRequest = &rapi1.ServerReflectionRequest_ListServices{
1✔
351
                        ListServices: req.ListServices,
1✔
352
                }
1✔
353
        default:
×
354
                panic(fmt.Sprintf("unexpected message request: %T", req))
×
355
        }
356

357
        return result
2✔
358
}
359

360
//goland:noinspection ALL
361
func (r Reflector) asV1AlphaResponse(req any, resp *rapi1.ServerReflectionResponse) any {
2✔
362
        result := &rapi1alpha.ServerReflectionResponse{OriginalRequest: req.(*rapi1alpha.ServerReflectionRequest)}
2✔
363

2✔
364
        switch resp := resp.MessageResponse.(type) {
2✔
365
        case *rapi1.ServerReflectionResponse_ErrorResponse:
×
366
                result.MessageResponse = &rapi1alpha.ServerReflectionResponse_ErrorResponse{
×
367
                        ErrorResponse: &rapi1alpha.ErrorResponse{
×
368
                                ErrorCode:    resp.ErrorResponse.ErrorCode,
×
369
                                ErrorMessage: resp.ErrorResponse.ErrorMessage,
×
370
                        },
×
371
                }
×
372
        case *rapi1.ServerReflectionResponse_FileDescriptorResponse:
1✔
373
                result.MessageResponse = &rapi1alpha.ServerReflectionResponse_FileDescriptorResponse{
1✔
374
                        FileDescriptorResponse: &rapi1alpha.FileDescriptorResponse{
1✔
375
                                FileDescriptorProto: resp.FileDescriptorResponse.FileDescriptorProto,
1✔
376
                        },
1✔
377
                }
1✔
378
        case *rapi1.ServerReflectionResponse_ListServicesResponse:
1✔
379
                result.MessageResponse = &rapi1alpha.ServerReflectionResponse_ListServicesResponse{
1✔
380
                        ListServicesResponse: &rapi1alpha.ListServiceResponse{
1✔
381
                                Service: lo.Map(resp.ListServicesResponse.Service,
1✔
382
                                        func(svc *rapi1.ServiceResponse, _ int) *rapi1alpha.ServiceResponse {
5✔
383
                                                return &rapi1alpha.ServiceResponse{Name: svc.Name}
4✔
384
                                        }),
4✔
385
                        },
386
                }
387
        default:
×
388
                panic(fmt.Sprintf("unexpected message response: %T", resp))
×
389
        }
390

391
        return result
2✔
392
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc