• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

singnet / snet-daemon / #659

18 Aug 2025 01:56PM UTC coverage: 36.314% (-1.7%) from 38.001%
#659

push

web-flow
chore: fix some minor issues in comments (#641)

Signed-off-by: longhutianjie <keplrnewton@icloud.com>
Co-authored-by: Semyon Novikov <semyon.novikov@singularitynet.io>

5702 of 15702 relevant lines covered (36.31%)

3.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.23
/handler/grpc.go
1
package handler
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "net/http"
11
        "net/url"
12
        "os/exec"
13
        "strings"
14

15
        "github.com/bufbuild/protocompile/linker"
16
        "github.com/gorilla/rpc/v2/json2"
17
        "go.uber.org/zap"
18
        "google.golang.org/grpc"
19
        "google.golang.org/grpc/codes"
20
        "google.golang.org/grpc/credentials"
21
        "google.golang.org/grpc/credentials/insecure"
22
        "google.golang.org/grpc/metadata"
23
        "google.golang.org/grpc/status"
24
        "google.golang.org/protobuf/encoding/protojson"
25
        "google.golang.org/protobuf/proto"
26
        "google.golang.org/protobuf/reflect/protoreflect"
27
        "google.golang.org/protobuf/types/dynamicpb"
28

29
        "github.com/singnet/snet-daemon/v6/blockchain"
30
        "github.com/singnet/snet-daemon/v6/codec"
31
        "github.com/singnet/snet-daemon/v6/config"
32
        "github.com/singnet/snet-daemon/v6/errs"
33
)
34

35
var grpcDesc = &grpc.StreamDesc{ServerStreams: true, ClientStreams: true}
36

37
type grpcHandler struct {
38
        grpcConn            *grpc.ClientConn
39
        grpcModelConn       *grpc.ClientConn
40
        options             grpc.DialOption
41
        enc                 string
42
        passthroughEndpoint string
43
        //modelTrainingEndpoint string
44
        executable         string
45
        serviceMetaData    *blockchain.ServiceMetadata
46
        serviceCredentials serviceCredentials
47
}
48

49
func (g grpcHandler) GrpcConn(isModelTraining bool) *grpc.ClientConn {
1✔
50
        if isModelTraining {
1✔
51
                return g.grpcModelConn
×
52
        }
×
53

54
        return g.grpcConn
1✔
55
}
56

57
func NewGrpcHandler(serviceMetadata *blockchain.ServiceMetadata) grpc.StreamHandler {
1✔
58
        passthroughEnabled := config.GetBool(config.PassthroughEnabledKey)
1✔
59

1✔
60
        if !passthroughEnabled {
1✔
61
                return grpcLoopback
×
62
        }
×
63

64
        h := grpcHandler{
1✔
65
                serviceMetaData:     serviceMetadata,
1✔
66
                enc:                 serviceMetadata.GetWireEncoding(),
1✔
67
                passthroughEndpoint: config.GetString(config.ServiceEndpointKey),
1✔
68
                //modelTrainingEndpoint: config.GetString(config.ModelTrainingEndpoint),
1✔
69
                executable: config.GetString(config.ExecutablePathKey),
1✔
70
                options: grpc.WithDefaultCallOptions(
1✔
71
                        grpc.MaxCallRecvMsgSize(config.GetInt(config.MaxMessageSizeInMB)*1024*1024),
1✔
72
                        grpc.MaxCallSendMsgSize(config.GetInt(config.MaxMessageSizeInMB)*1024*1024)),
1✔
73
        }
1✔
74

1✔
75
        switch serviceMetadata.GetServiceType() {
1✔
76
        case "grpc":
1✔
77
                h.grpcConn = h.getConnection(h.passthroughEndpoint)
1✔
78
                //if config.GetBool(config.ModelTrainingEnabled) {
1✔
79
                //        h.grpcModelConn = h.getConnection(h.modelTrainingEndpoint)
1✔
80
                //}
1✔
81
                return h.grpcToGRPC
1✔
82
        case "jsonrpc":
×
83
                return h.grpcToJSONRPC
×
84
        case "http":
×
85
                h.serviceCredentials = serviceCredentials{}
×
86
                err := config.Vip().UnmarshalKey(config.ServiceCredentialsKey, &h.serviceCredentials)
×
87
                if err != nil {
×
88
                        zap.L().Fatal("invalid config", zap.Error(fmt.Errorf("%v%v", err, errs.ErrDescURL(errs.InvalidServiceCredentials))))
×
89
                }
×
90
                err = h.serviceCredentials.validate()
×
91
                if err != nil {
×
92
                        zap.L().Fatal("invalid config", zap.Error(fmt.Errorf("%v%v", err, errs.ErrDescURL(errs.InvalidServiceCredentials))))
×
93
                }
×
94
                return h.grpcToHTTP
×
95
        case "process":
×
96
                return h.grpcToProcess
×
97
        }
98
        return nil
×
99
}
100

101
func (srvCreds serviceCredentials) validate() error {
5✔
102
        if len(srvCreds) > 0 {
10✔
103
                for _, v := range srvCreds {
10✔
104
                        if v.Location != body && v.Location != header && v.Location != query {
7✔
105
                                return fmt.Errorf("invalid service_credentials: location should be body, header or query")
2✔
106
                        }
2✔
107
                        if v.Key == "" {
4✔
108
                                return fmt.Errorf("invalid service_credentials: key can't be empty")
1✔
109
                        }
1✔
110
                }
111
        }
112
        return nil
2✔
113
}
114

115
func (g grpcHandler) getConnection(endpoint string) (conn *grpc.ClientConn) {
1✔
116

1✔
117
        if !strings.Contains(endpoint, "://") {
1✔
118
                endpoint = "grpc" + "://" + endpoint
×
119
        }
×
120

121
        passthroughURL, err := url.Parse(endpoint)
1✔
122
        if err != nil || passthroughURL == nil {
1✔
123
                zap.L().Fatal(fmt.Sprintf("can't parse service_endpoint %v", errs.ErrDescURL(errs.InvalidConfig)), zap.String("endpoint", endpoint))
×
124
        }
×
125
        if strings.Compare(passthroughURL.Scheme, "https") == 0 {
1✔
126
                conn, err = grpc.NewClient(passthroughURL.Host,
×
127
                        grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), g.options)
×
128
                if err != nil {
×
129
                        zap.L().Panic("error dialing service", zap.Error(err))
×
130
                }
×
131
                return conn
×
132
        }
133
        conn, err = grpc.NewClient(passthroughURL.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), g.options)
1✔
134
        if err != nil {
1✔
135
                zap.L().Panic("error dialing service", zap.Error(err))
×
136
        }
×
137
        return conn
1✔
138
}
139

140
/*
141
Modified from https://github.com/mwitkow/grpc-proxy/blob/67591eb23c48346a480470e462289835d96f70da/proxy/handler.go#L61
142
Original Copyright 2017 Michal Witkowski. All Rights Reserved. See LICENSE-GRPC-PROXY for licensing terms.
143
Modifications Copyright 2018 SingularityNET Foundation. All Rights Reserved. See LICENSE for licensing terms.
144
*/
145
func (g grpcHandler) grpcToGRPC(srv any, inStream grpc.ServerStream) error {
1✔
146
        method, ok := grpc.MethodFromServerStream(inStream)
1✔
147
        if !ok {
1✔
148
                return status.Errorf(codes.Internal, "could not determine method from server stream")
×
149
        }
×
150

151
        inCtx := inStream.Context()
1✔
152
        md, ok := metadata.FromIncomingContext(inCtx)
1✔
153
        if !ok {
1✔
154
                return status.Errorf(codes.Internal, "could not get metadata from incoming context")
×
155
        }
×
156

157
        outCtx, outCancel := context.WithCancel(inCtx)
1✔
158
        defer outCancel()
1✔
159
        outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())
1✔
160
        isModelTraining := g.serviceMetaData.IsModelTraining(method)
1✔
161
        outStream, err := g.GrpcConn(isModelTraining).NewStream(outCtx, grpcDesc, method, grpc.CallContentSubtype(g.enc))
1✔
162
        if err != nil {
1✔
163
                return status.Errorf(codes.Internal, "can't connect to service %v%v", err, errs.ErrDescURL(errs.ServiceUnavailable))
×
164
        }
×
165

166
        s2cErrChan := forwardServerToClient(inStream, outStream)
1✔
167
        c2sErrChan := forwardClientToServer(outStream, inStream)
1✔
168

1✔
169
        for i := 0; i < 2; i++ {
3✔
170
                select {
2✔
171
                case s2cErr := <-s2cErrChan:
1✔
172
                        if s2cErr == io.EOF {
2✔
173
                                // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
1✔
174
                                // the clientStream>inStream may continue pumping though.
1✔
175
                                errCloseSend := outStream.CloseSend()
1✔
176
                                if errCloseSend != nil {
1✔
177
                                        zap.L().Debug("failed close outStream", zap.Error(err))
×
178
                                }
×
179
                                break
1✔
180
                        } else {
×
181
                                // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
×
182
                                // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
×
183
                                // exit with an error to the stack
×
184
                                outCancel()
×
185
                                return status.Errorf(codes.Internal, "failed proxying s2c: %v%s", s2cErr, errs.ErrDescURL(errs.ServiceUnavailable))
×
186
                        }
×
187
                case c2sErr := <-c2sErrChan:
1✔
188
                        // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
1✔
189
                        // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
1✔
190
                        // will be nil.
1✔
191
                        inStream.SetTrailer(outStream.Trailer())
1✔
192
                        // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
1✔
193
                        if c2sErr != io.EOF {
2✔
194
                                return c2sErr
1✔
195
                        }
1✔
196
                        return nil
×
197
                }
198
        }
199
        return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
×
200
}
201

202
/*
203
Modified from https://github.com/mwitkow/grpc-proxy/blob/67591eb23c48346a480470e462289835d96f70da/proxy/handler.go#L115
204
Original Copyright 2017 Michal Witkowski. All Rights Reserved. See LICENSE-GRPC-PROXY for licensing terms.
205
Modifications Copyright 2018 SingularityNET Foundation. All Rights Reserved. See LICENSE for licensing terms.
206
*/
207
func forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
1✔
208
        ret := make(chan error, 1)
1✔
209
        go func() {
2✔
210
                f := &codec.GrpcFrame{}
1✔
211
                for i := 0; ; i++ {
3✔
212
                        if err := src.RecvMsg(f); err != nil {
3✔
213
                                ret <- err // this can be io.EOF which is happy case
1✔
214
                                break
1✔
215
                        }
216
                        if i == 0 {
2✔
217
                                // This is a bit of a hack, but client to server headers are only readable after first client msg is
1✔
218
                                // received but must be written to server stream before the first msg is flushed.
1✔
219
                                // This is the only place to do it nicely.
1✔
220
                                md, err := src.Header()
1✔
221
                                if err != nil {
1✔
222
                                        ret <- err
×
223
                                        break
×
224
                                }
225
                                if err := dst.SendHeader(md); err != nil {
1✔
226
                                        ret <- err
×
227
                                        break
×
228
                                }
229
                        }
230
                        if err := dst.SendMsg(f); err != nil {
1✔
231
                                ret <- err
×
232
                                break
×
233
                        }
234
                }
235
        }()
236
        return ret
1✔
237
}
238

239
/*
240
Modified from https://github.com/mwitkow/grpc-proxy/blob/67591eb23c48346a480470e462289835d96f70da/proxy/handler.go#L147
241
Original Copyright 2017 Michal Witkowski. All Rights Reserved. See LICENSE-GRPC-PROXY for licensing terms.
242
Modifications Copyright 2018 SingularityNET Foundation. All Rights Reserved. See LICENSE for licensing terms.
243
*/
244
func forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
1✔
245
        ret := make(chan error, 1)
1✔
246
        go func() {
2✔
247
                f := &codec.GrpcFrame{}
1✔
248
                for i := 0; ; i++ {
3✔
249
                        //Only for the first time do this, once RecvMsg has been called,
2✔
250
                        //future calls will result in io.EOF , we want to retrieve the
2✔
251
                        // first message sent by the client and pass this on the regular service call
2✔
252
                        //This is done to be able to make calls to support regular Service call + Dynamic pricing call
2✔
253
                        if i == 0 {
3✔
254
                                // todo we need to think through to determine price for every call on stream calls
1✔
255
                                // will be handled when we support streaming and pricing across all clients in snet-platform
1✔
256
                                if wrappedStream, ok := src.(*WrapperServerStream); ok {
1✔
257
                                        f = (wrappedStream.OriginalRecvMsg()).(*codec.GrpcFrame)
×
258
                                } else if err := src.RecvMsg(f); err != nil {
1✔
259
                                        ret <- err
×
260
                                        break
×
261
                                }
262
                        } else if err := src.RecvMsg(f); err != nil {
2✔
263
                                ret <- err // this can be io.EOF which is happy case
1✔
264
                                break
1✔
265
                        }
266
                        if err := dst.SendMsg(f); err != nil {
1✔
267
                                ret <- err
×
268
                                break
×
269
                        }
270
                }
271
        }()
272
        return ret
1✔
273
}
274

275
type httpLocation string
276

277
var query httpLocation = "query"
278
var body httpLocation = "body"
279
var header httpLocation = "header"
280

281
type serviceCredential struct {
282
        Key      string       `json:"key"`
283
        Value    any          `json:"value"`
284
        Location httpLocation `json:"location"`
285
}
286

287
type serviceCredentials []serviceCredential
288

289
func (g grpcHandler) grpcToHTTP(srv any, inStream grpc.ServerStream) error {
×
290
        method, ok := grpc.MethodFromServerStream(inStream)
×
291
        if !ok {
×
292
                return status.Errorf(codes.Internal, "could not determine method from server stream")
×
293
        }
×
294

295
        methodSegs := strings.Split(method, "/")
×
296
        method = methodSegs[len(methodSegs)-1]
×
297

×
298
        zap.L().Info("Calling method", zap.String("method", method))
×
299

×
300
        f := &codec.GrpcFrame{}
×
301
        if err := inStream.RecvMsg(f); err != nil {
×
302
                zap.L().Error(fmt.Sprintf("error receiving grpc msg: %v%v", err, errs.ErrDescURL(errs.ReceiveMsgError)))
×
303
                return status.Errorf(codes.Internal, "error receiving grpc msg: %v%v", err, errs.ErrDescURL(errs.ReceiveMsgError))
×
304
        }
×
305

306
        // convert proto msg to json
307
        jsonBody, err := protoToJson(g.serviceMetaData.ProtoDescriptors, f.Data, method)
×
308
        if err != nil {
×
309
                return status.Errorf(codes.Internal, "protoToJson error: %+v", errs.ErrDescURL(errs.InvalidProto))
×
310
        }
×
311

312
        zap.L().Debug("Proto to json result", zap.String("json", string(jsonBody)))
×
313

×
314
        base, err := url.Parse(g.passthroughEndpoint)
×
315
        if err != nil {
×
316
                zap.L().Error("can't parse passthroughEndpoint", zap.Error(err))
×
317
                return status.Errorf(codes.Internal, "can't parse service_endpoint %v%v", err, errs.ErrDescURL(errs.InvalidConfig))
×
318
        }
×
319

320
        base.Path += method // method from proto should be the same as http handler path
×
321

×
322
        params := url.Values{}
×
323
        headers := http.Header{}
×
324

×
325
        var bodyMap = map[string]any{}
×
326
        errJson := json.Unmarshal(jsonBody, &bodyMap)
×
327

×
328
        for _, cred := range g.serviceCredentials {
×
329
                switch cred.Location {
×
330
                case query:
×
331
                        v, ok := cred.Value.(string)
×
332
                        if ok {
×
333
                                params.Add(cred.Key, v)
×
334
                        }
×
335
                case body:
×
336
                        if errJson == nil {
×
337
                                bodyMap[cred.Key] = cred.Value
×
338
                        }
×
339
                case header:
×
340
                        v, ok := cred.Value.(string)
×
341
                        if ok {
×
342
                                headers.Set(cred.Key, v)
×
343
                        }
×
344
                }
345
        }
346

347
        if errJson == nil {
×
348
                newJson, err := json.Marshal(bodyMap)
×
349
                if err == nil {
×
350
                        jsonBody = newJson
×
351
                } else {
×
352
                        zap.L().Debug("Can't marshal json", zap.Error(err))
×
353
                }
×
354
        }
355

356
        base.RawQuery = params.Encode()
×
357
        zap.L().Debug("Calling http service",
×
358
                zap.String("url", base.String()),
×
359
                zap.String("body", string(jsonBody)),
×
360
                zap.String("method", "POST"))
×
361

×
362
        httpReq, err := http.NewRequest("POST", base.String(), bytes.NewBuffer(jsonBody))
×
363
        if err != nil {
×
364
                return status.Errorf(codes.Internal, "error creating http request: %+v%v", err, errs.ErrDescURL(errs.HTTPRequestBuildError))
×
365
        }
×
366
        httpReq.Header = headers
×
367
        httpReq.Header.Set("content-type", "application/json")
×
368

×
369
        httpResp, err := http.DefaultClient.Do(httpReq)
×
370
        if err != nil {
×
371
                return status.Errorf(codes.Internal, "error executing HTTP service: %+v%v", err, errs.ErrDescURL(errs.ServiceUnavailable))
×
372
        }
×
373
        resp, err := io.ReadAll(httpResp.Body)
×
374
        if err != nil {
×
375
                return status.Errorf(codes.Internal, "error reading response from HTTP service: %+v%v", err, errs.ErrDescURL(errs.ServiceUnavailable))
×
376
        }
×
377
        zap.L().Debug("Response from HTTP service", zap.String("response", string(resp)))
×
378

×
379
        protoMessage, errMarshal := jsonToProto(g.serviceMetaData.ProtoDescriptors, resp, method)
×
380
        if errMarshal != nil {
×
381
                return status.Errorf(codes.Internal, "jsonToProto error: %+v%v", errMarshal, errs.ErrDescURL(errs.InvalidProto))
×
382
        }
×
383

384
        if err = inStream.SendMsg(protoMessage); err != nil {
×
385
                return status.Errorf(codes.Internal, "error sending response from HTTP service: %+v", err)
×
386
        }
×
387

388
        return nil
×
389
}
390

391
func findMethodInProto(protoFiles linker.Files, methodName string) (method protoreflect.MethodDescriptor) {
5✔
392
        for _, protoFile := range protoFiles {
12✔
393
                if protoFile.Services().Len() == 0 {
7✔
394
                        continue
×
395
                }
396

397
                for i := 0; i < protoFile.Services().Len(); i++ {
15✔
398
                        service := protoFile.Services().Get(i)
8✔
399
                        if service == nil {
8✔
400
                                continue
×
401
                        }
402

403
                        method = service.Methods().ByName(protoreflect.Name(methodName))
8✔
404
                        if method != nil {
11✔
405
                                return method
3✔
406
                        }
3✔
407
                }
408
        }
409
        return nil
2✔
410
}
411

412
func jsonToProto(protoFiles linker.Files, json []byte, methodName string) (proto proto.Message, err error) {
2✔
413

2✔
414
        method := findMethodInProto(protoFiles, methodName)
2✔
415
        if method == nil {
3✔
416
                zap.L().Error("[jsonToProto] method not found in proto for http call")
1✔
417
                return proto, errors.New("method in proto not found")
1✔
418
        }
1✔
419

420
        output := method.Output()
1✔
421
        zap.L().Debug("output msg descriptor", zap.String("fullname", string(output.FullName())))
1✔
422
        proto = dynamicpb.NewMessage(output)
1✔
423
        err = protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}.Unmarshal(json, proto)
1✔
424
        if err != nil {
1✔
425
                zap.L().Error("can't unmarshal json to proto", zap.Error(err))
×
426
                return proto, fmt.Errorf("invalid proto, can't convert json to proto msg: %+v", err)
×
427
        }
×
428

429
        return proto, nil
1✔
430
}
431

432
func protoToJson(protoFiles linker.Files, in []byte, methodName string) (json []byte, err error) {
×
433

×
434
        method := findMethodInProto(protoFiles, methodName)
×
435
        if method == nil {
×
436
                zap.L().Error("[protoToJson] method not found in proto for http call")
×
437
                return []byte("error, method in proto not found"), errors.New("method in proto not found")
×
438
        }
×
439

440
        input := method.Input()
×
441
        zap.L().Debug("[protoToJson]", zap.Any("methodName", input.FullName()))
×
442
        msg := dynamicpb.NewMessage(input)
×
443
        err = proto.Unmarshal(in, msg)
×
444
        if err != nil {
×
445
                zap.L().Error("Error in unmarshalling", zap.Error(err))
×
446
                return []byte("error, invalid proto file or input request"), fmt.Errorf("error in unmarshaling proto to json: %+v", err)
×
447
        }
×
448
        json, err = protojson.MarshalOptions{UseProtoNames: true}.Marshal(msg)
×
449
        if err != nil {
×
450
                zap.L().Error("Error in marshaling", zap.Error(err))
×
451
                return []byte("error, invalid proto file or input request"), fmt.Errorf("error in marshaling proto to json: %+v", err)
×
452
        }
×
453
        zap.L().Debug("ProtoToJson result:", zap.String("json", string(json)))
×
454

×
455
        return json, nil
×
456
}
457

458
func (g grpcHandler) grpcToJSONRPC(srv any, inStream grpc.ServerStream) error {
×
459
        method, ok := grpc.MethodFromServerStream(inStream)
×
460
        if !ok {
×
461
                return status.Errorf(codes.Internal, "could not determine method from server stream")
×
462
        }
×
463

464
        methodSegs := strings.Split(method, "/")
×
465
        method = methodSegs[len(methodSegs)-1]
×
466

×
467
        f := &codec.GrpcFrame{}
×
468
        if err := inStream.RecvMsg(f); err != nil {
×
469
                return status.Errorf(codes.Internal, "error receiving request; error: %+v", err)
×
470
        }
×
471

472
        params := new(any)
×
473

×
474
        if err := json.Unmarshal(f.Data, params); err != nil {
×
475
                return status.Errorf(codes.Internal, "error unmarshaling request; error: %+v", err)
×
476
        }
×
477

478
        jsonRPCReq, err := json2.EncodeClientRequest(method, params)
×
479

×
480
        if err != nil {
×
481
                return status.Errorf(codes.Internal, "error encoding request; error: %+v", err)
×
482
        }
×
483

484
        httpReq, err := http.NewRequest("POST", g.passthroughEndpoint, bytes.NewBuffer(jsonRPCReq))
×
485
        if err != nil {
×
486
                return status.Errorf(codes.Internal, "error creating http request; error: %+v", err)
×
487
        }
×
488

489
        httpReq.Header.Set("content-type", "application/json")
×
490
        httpResp, err := http.DefaultClient.Do(httpReq)
×
491

×
492
        if err != nil {
×
493
                return status.Errorf(codes.Internal, "error executing http call; error: %+v", err)
×
494
        }
×
495

496
        result := new(any)
×
497

×
498
        if err = json2.DecodeClientResponse(httpResp.Body, result); err != nil {
×
499
                return status.Errorf(codes.Internal, "json-rpc error; error: %+v", err)
×
500
        }
×
501

502
        respBytes, err := json.Marshal(result)
×
503

×
504
        if err != nil {
×
505
                return status.Errorf(codes.Internal, "error marshaling response; error: %+v", err)
×
506
        }
×
507

508
        f = &codec.GrpcFrame{Data: respBytes}
×
509

×
510
        if err = inStream.SendMsg(f); err != nil {
×
511
                return status.Errorf(codes.Internal, "error sending response; error: %+v", err)
×
512
        }
×
513

514
        return nil
×
515
}
516

517
type WrapperServerStream struct {
518
        sendHeaderCalled bool
519
        stream           grpc.ServerStream
520
        recvMessage      any
521
        sentMessage      any
522
        Ctx              context.Context
523
}
524

525
func (f *WrapperServerStream) SetTrailer(md metadata.MD) {
×
526
        f.stream.SetTrailer(md)
×
527
}
×
528

529
func NewWrapperServerStream(stream grpc.ServerStream, ctx context.Context) (grpc.ServerStream, error) {
6✔
530
        m := &codec.GrpcFrame{}
6✔
531
        err := stream.RecvMsg(m)
6✔
532
        f := &WrapperServerStream{
6✔
533
                stream:           stream,
6✔
534
                recvMessage:      m,
6✔
535
                sendHeaderCalled: false,
6✔
536
                Ctx:              ctx, // save modified ctx
6✔
537
        }
6✔
538
        return f, err
6✔
539
}
6✔
540

541
func (f *WrapperServerStream) Context() context.Context {
6✔
542
        // old way return f.stream.Context()
6✔
543
        return f.Ctx // return modified context
6✔
544
}
6✔
545

546
func (f *WrapperServerStream) SetHeader(md metadata.MD) error {
×
547
        return f.stream.SetHeader(md)
×
548
}
×
549

550
func (f *WrapperServerStream) SendHeader(md metadata.MD) error {
×
551
        //this is more of a hack to support dynamic pricing
×
552
        // when the service method returns the price in cogs, the SendHeader will be called,
×
553
        // we don't want this as the SendHeader can be called just once in the ServerStream
×
554
        if !f.sendHeaderCalled {
×
555
                return nil
×
556
        }
×
557
        f.sendHeaderCalled = true
×
558
        return f.stream.SendHeader(md)
×
559
}
560

561
func (f *WrapperServerStream) SendMsg(m any) error {
×
562
        return f.stream.SendMsg(m)
×
563
}
×
564

565
func (f *WrapperServerStream) RecvMsg(m any) error {
×
566
        return f.stream.RecvMsg(m)
×
567
}
×
568

569
func (f *WrapperServerStream) OriginalRecvMsg() any {
×
570
        return f.recvMessage
×
571
}
×
572

573
func (g grpcHandler) grpcToProcess(srv any, inStream grpc.ServerStream) error {
×
574
        method, ok := grpc.MethodFromServerStream(inStream)
×
575

×
576
        if !ok {
×
577
                return status.Errorf(codes.Internal, "could not determine method from server stream")
×
578
        }
×
579

580
        methodSegs := strings.Split(method, "/")
×
581
        method = methodSegs[len(methodSegs)-1]
×
582

×
583
        f := &codec.GrpcFrame{}
×
584
        if err := inStream.RecvMsg(f); err != nil {
×
585
                return status.Errorf(codes.Internal, "error receiving request; error: %+v", err)
×
586
        }
×
587

588
        cmd := exec.Command(g.executable, method)
×
589
        stdin, err := cmd.StdinPipe()
×
590

×
591
        if err != nil {
×
592
                return status.Errorf(codes.Internal, "error creating stdin pipe; error: %+v", err)
×
593
        }
×
594

595
        if _, err := stdin.Write(f.Data); err != nil {
×
596
                return status.Errorf(codes.Internal, "error writing to stdin; error: %+v", err)
×
597
        }
×
598
        stdin.Close()
×
599

×
600
        out, err := cmd.CombinedOutput()
×
601

×
602
        if err != nil {
×
603
                return status.Errorf(codes.Internal, "error executing process; error: %+v", err)
×
604
        }
×
605

606
        f = &codec.GrpcFrame{Data: out}
×
607

×
608
        if err = inStream.SendMsg(f); err != nil {
×
609
                return status.Errorf(codes.Internal, "error sending response; error: %+v", err)
×
610
        }
×
611

612
        return nil
×
613
}
614

615
func grpcLoopback(srv any, inStream grpc.ServerStream) error {
×
616
        f := &codec.GrpcFrame{}
×
617
        if err := inStream.RecvMsg(f); err != nil {
×
618
                return status.Errorf(codes.Internal, "error receiving request; error: %+v", err)
×
619
        }
×
620

621
        if err := inStream.SendMsg(f); err != nil {
×
622
                return status.Errorf(codes.Internal, "error sending response; error: %+v", err)
×
623
        }
×
624

625
        return nil
×
626
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc