• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

singnet / snet-daemon / #657

17 Aug 2025 09:40PM UTC coverage: 36.256%. First build
#657

Pull #642

semyon-dev
fix: tests
Pull Request #642: release v6.1.0

353 of 631 new or added lines in 43 files covered. (55.94%)

5681 of 15669 relevant lines covered (36.26%)

3.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.94
/handler/stream_interceptor.go
1
package handler
2

3
import (
4
        "fmt"
5
        "time"
6

7
        "github.com/ethereum/go-ethereum/common"
8
        "github.com/singnet/snet-daemon/v6/blockchain"
9
        "github.com/singnet/snet-daemon/v6/configuration_service"
10
        "github.com/singnet/snet-daemon/v6/metrics"
11
        "github.com/singnet/snet-daemon/v6/ratelimit"
12
        "go.uber.org/zap"
13

14
        "math/big"
15
        "strings"
16

17
        "golang.org/x/time/rate"
18
        "google.golang.org/grpc"
19
        "google.golang.org/grpc/codes"
20
        "google.golang.org/grpc/metadata"
21
        "google.golang.org/grpc/status"
22
)
23

24
const (
25
        // PaymentTypeHeader is a type of payment used to pay for a RPC call.
26
        // Supported types are: "escrow".
27
        // Note: "job" Payment type is deprecated
28
        PaymentTypeHeader = "snet-payment-type"
29
        // Client that calls the Daemon ( example can be "snet-cli","snet-dapp","snet-sdk")
30
        ClientTypeHeader = "snet-client-type"
31
        // Value is a user address , example "0x94d04332C4f5273feF69c4a52D24f42a3aF1F207"
32
        UserInfoHeader = "snet-user-info"
33
        // User Agent details set in on the server stream info
34
        UserAgentHeader = "user-agent"
35
        // PaymentChannelIDHeader is a MultiPartyEscrow contract payment channel
36
        // id. Value is a string containing a decimal number.
37
        PaymentChannelIDHeader = "snet-payment-channel-id"
38
        // PaymentChannelNonceHeader is a payment channel nonce value. Value is a
39
        // string containing a decimal number.
40
        PaymentChannelNonceHeader = "snet-payment-channel-nonce"
41
        // PaymentChannelAmountHeader is an amount of payment channel value
42
        // which server is authorized to withdraw after handling the RPC call.
43
        // Value is a string containing a decimal number.
44
        PaymentChannelAmountHeader = "snet-payment-channel-amount"
45
        // PaymentChannelSignatureHeader is a signature of the client to confirm
46
        // amount withdrawing authorization. Value is an array of bytes.
47
        PaymentChannelSignatureHeader = "snet-payment-channel-signature-bin"
48
        // This is useful information in the header sent in by the client
49
        // All clients will have this information and they need this to Sign anyways
50
        // When Daemon is running in the block chain disabled mode , it would use this
51
        // header to get the MPE address. The goal here is to keep the client oblivious to the
52
        // Daemon block chain enabled or disabled mode and also standardize the signatures.
53
        // id. Value is a string containing a decimal number.
54
        PaymentMultiPartyEscrowAddressHeader = "snet-payment-mpe-address"
55

56
        //Added for free call support in Daemon
57

58
        //The user Id of the person making the call
59
        FreeCallUserIdHeader      = "snet-free-call-user-id"
60
        FreeCallUserAddressHeader = "snet-free-call-user-address"
61

62
        //Will be used to check if the Signature is still valid
63
        CurrentBlockNumberHeader = "snet-current-block-number"
64

65
        //Place holder to set the free call Auth Token issued
66
        FreeCallAuthTokenHeader = "snet-free-call-auth-token-bin"
67
        //Block number on when the Token was issued, to track the expiry of the token, which is ~ 1 Month
68
        //FreeCallAuthTokenExpiryBlockNumberHeader = "snet-free-call-token-expiry-block"
69

70
        //Users may decide to sign upfront and make calls .Daemon generates and Auth Token
71
        //Users/Clients will need to use this token to make calls for the amount signed upfront.
72
        PrePaidAuthTokenHeader = "snet-prepaid-auth-token-bin"
73

74
        DynamicPriceDerived = "snet-derived-dynamic-price-cost"
75

76
        TrainingModelId = "snet-train-model-id"
77
)
78

79
// GrpcStreamContext contains information about gRPC call which is used to
80
// validate payment and pricing.
81
type GrpcStreamContext struct {
82
        MD       metadata.MD
83
        Info     *grpc.StreamServerInfo
84
        InStream grpc.ServerStream
85
}
86

87
func (context *GrpcStreamContext) String() string {
×
88
        return fmt.Sprintf("{MD: %v, Info: %v", context.MD, *context.Info)
×
89
}
×
90

91
// Payment represents payment handler specific data which is validated
92
// and used to complete payment.
93
type Payment any
94

95
// Custom gRPC codes to return to the client
96
const (
97
        // IncorrectNonce is returned to client when payment received contains
98
        // incorrect nonce value. Client may use PaymentChannelStateService to get
99
        // latest channel state and correct nonce value.
100
        IncorrectNonce codes.Code = 1000
101
)
102

103
// GrpcError is an error which will be returned by interceptor via gRPC
104
// protocol. Part of information will be returned as header metadata.
105
type GrpcError struct {
106
        // Status is a gRPC call status
107
        Status *status.Status
108
}
109

110
func (err *GrpcError) Error() string {
1✔
111
        return err.String()
1✔
112
}
1✔
113

114
// Err returns error to return correct gRPC error to the caller
115
func (err *GrpcError) Err() error {
4✔
116
        if err.Status == nil {
4✔
117
                return nil
×
118
        }
×
119
        return err.Status.Err()
4✔
120
}
121

122
// String converts GrpcError to string
123
func (err *GrpcError) String() string {
1✔
124
        return fmt.Sprintf("{Status: %v}", err.Status)
1✔
125
}
1✔
126

127
// NewGrpcError returns new error which contains gRPC status with provided code
128
// and message
129
func NewGrpcError(code codes.Code, message string) *GrpcError {
11✔
130
        return &GrpcError{
11✔
131
                Status: status.New(code, message),
11✔
132
        }
11✔
133
}
11✔
134

135
// NewGrpcErrorf returns new error which contains gRPC status with provided
136
// code and message formed from format string and args.
137
func NewGrpcErrorf(code codes.Code, format string, args ...any) *GrpcError {
23✔
138
        if len(args) == 0 {
31✔
139
                return &GrpcError{
8✔
140
                        Status: status.New(code, format),
8✔
141
                }
8✔
142
        }
8✔
143
        return &GrpcError{
15✔
144
                Status: status.Newf(code, format, args...),
15✔
145
        }
15✔
146
}
147

148
// StreamPaymentHandler interface which is used by gRPC interceptor to get, validate
149
// and complete payment. There are two payment handler implementations so far:
150
// jobPaymentHandler and escrowPaymentHandler. jobPaymentHandler is deprecated.
151
type StreamPaymentHandler interface {
152
        // Type is a content of PaymentTypeHeader field which triggers usage of the
153
        // payment handler.
154
        Type() (typ string)
155
        // Payment extracts payment data from gRPC request context and checks
156
        // validity of payment data. It returns nil if data is valid or
157
        // appropriate gRPC status otherwise.
158
        Payment(context *GrpcStreamContext) (payment Payment, err *GrpcError)
159
        // Complete completes payment if gRPC call was successfully proceeded by
160
        // service.
161
        Complete(payment Payment) (err *GrpcError)
162
        // CompleteAfterError completes payment if service returns error.
163
        CompleteAfterError(payment Payment, result error) (err *GrpcError)
164
}
165

166
type rateLimitInterceptor struct {
167
        rateLimiter                   rate.Limiter
168
        messageBroadcaster            *configuration_service.MessageBroadcaster
169
        processRequest                int
170
        requestProcessingNotification chan int
171
}
172

173
func GrpcRateLimitInterceptor(broadcast *configuration_service.MessageBroadcaster) grpc.StreamServerInterceptor {
×
174
        interceptor := &rateLimitInterceptor{
×
175
                rateLimiter:                   *ratelimit.NewRateLimiter(),
×
176
                messageBroadcaster:            broadcast,
×
NEW
177
                processRequest:                configuration_service.StartProcessingAnyRequest,
×
178
                requestProcessingNotification: broadcast.NewSubscriber(),
×
179
        }
×
180
        go interceptor.startOrStopProcessingAnyRequests()
×
181
        return interceptor.intercept
×
182
}
×
183

184
func (interceptor *rateLimitInterceptor) startOrStopProcessingAnyRequests() {
×
185
        for {
×
186
                interceptor.processRequest = <-interceptor.requestProcessingNotification
×
187
        }
×
188
}
189

NEW
190
func GrpcMeteringInterceptor(currentBlock func() (*big.Int, error)) grpc.StreamServerInterceptor {
×
NEW
191
        return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
NEW
192
                return interceptMetering(srv, ss, info, handler, currentBlock)
×
NEW
193
        }
×
194
}
195

196
// Monitor requests arrived, and responses sent and publish these stats for Reporting
197
func interceptMetering(
198
        srv any,
199
        ss grpc.ServerStream,
200
        info *grpc.StreamServerInfo,
201
        handler grpc.StreamHandler,
202
        currentBlock func() (*big.Int, error),
NEW
203
) error {
×
204
        var (
×
205
                err   error
×
206
                start time.Time
×
207
        )
×
208
        start = time.Now()
×
209

×
NEW
210
        methodName, _ := grpc.MethodFromServerStream(ss)
×
211
        commonStats := metrics.BuildCommonStats(start, methodName)
×
NEW
212

×
NEW
213
        if ctx, err := getGrpcContext(ss, info); err == nil {
×
NEW
214
                setAdditionalDetails(ctx, commonStats)
×
215
        }
×
216

217
        defer func() {
×
NEW
218
                var block *big.Int
×
NEW
219
                if currentBlock != nil {
×
NEW
220
                        block, _ = currentBlock()
×
NEW
221
                }
×
NEW
222
                go metrics.PublishResponseStats(commonStats, time.Since(start), err, block)
×
223
        }()
224

225
        err = handler(srv, ss)
×
226
        if err != nil {
×
227
                zap.L().Error(err.Error())
×
228
                return err
×
229
        }
×
230
        return nil
×
231
}
232

233
func (interceptor *rateLimitInterceptor) intercept(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
234

×
NEW
235
        if interceptor.processRequest == configuration_service.StopProcessingAnyRequest {
×
236
                return status.New(codes.Unavailable, "No requests are currently being processed, please try again later").Err()
×
237
        }
×
238
        if !interceptor.rateLimiter.Allow() {
×
239
                zap.L().Info("rate limit reached, too many requests to handle", zap.Any("rateLimiter.Burst()", interceptor.rateLimiter.Burst()))
×
240
                return status.New(codes.ResourceExhausted, "rate limiting , too many requests to handle").Err()
×
241
        }
×
242
        err := handler(srv, ss)
×
243
        if err != nil {
×
244
                zap.L().Error(err.Error())
×
245
                return err
×
246
        }
×
247
        return nil
×
248
}
249

250
// GrpcPaymentValidationInterceptor returns gRPC interceptor to validate payment.
251
// If the blockchain is disabled, then noOpInterceptor is returned.
252
func GrpcPaymentValidationInterceptor(serviceData *blockchain.ServiceMetadata, defaultPaymentHandler StreamPaymentHandler, paymentHandler ...StreamPaymentHandler) grpc.StreamServerInterceptor {
1✔
253
        interceptor := &paymentValidationInterceptor{
1✔
254
                defaultPaymentHandler: defaultPaymentHandler,
1✔
255
                paymentHandlers:       make(map[string]StreamPaymentHandler),
1✔
256
                serviceMetadata:       serviceData,
1✔
257
        }
1✔
258

1✔
259
        interceptor.paymentHandlers[defaultPaymentHandler.Type()] = defaultPaymentHandler
1✔
260
        zap.L().Info("Default payment handler registered", zap.Any("defaultPaymentType", defaultPaymentHandler.Type()))
1✔
261
        for _, handler := range paymentHandler {
2✔
262
                interceptor.paymentHandlers[handler.Type()] = handler
1✔
263
                zap.L().Info("Payment handler for type registered", zap.Any("paymentType", handler.Type()))
1✔
264
        }
1✔
265
        return interceptor.streamIntercept
1✔
266
}
267

268
type paymentValidationInterceptor struct {
269
        serviceMetadata       *blockchain.ServiceMetadata
270
        defaultPaymentHandler StreamPaymentHandler
271
        paymentHandlers       map[string]StreamPaymentHandler
272
}
273

274
func (interceptor *paymentValidationInterceptor) streamIntercept(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (e error) {
6✔
275
        var err *GrpcError
6✔
276

6✔
277
        // read ctx and prepare GrpcStreamContext
6✔
278
        grpcCtx, err := getGrpcContext(ss, info)
6✔
279
        if err != nil {
6✔
280
                return err.Err()
×
281
        }
×
282

283
        //wrapperStream := ss
284

285
        //if config.GetBool(config.EnableDynamicPricing) {
286
        //var streamError error
287
        wrapperStream, streamError := NewWrapperServerStream(ss, grpcCtx.InStream.Context())
6✔
288
        if streamError != nil {
6✔
NEW
289
                return streamError
×
NEW
290
        }
×
291
        //}
292

293
        // Now we are working with grpcCtx and wrapperStream further
294
        paymentHandler, err := interceptor.getPaymentHandler(grpcCtx)
6✔
295
        if err != nil {
6✔
296
                return err.Err()
×
297
        }
×
298

299
        zap.L().Debug("[streamIntercept] New gRPC call received", zap.Any("context", grpcCtx))
6✔
300

6✔
301
        payment, err := paymentHandler.Payment(grpcCtx)
6✔
302
        if err != nil {
7✔
303
                return err.Err()
1✔
304
        }
1✔
305

306
        if sp, ok := payment.(SenderProvider); ok {
5✔
NEW
307
                // copy the original incoming MD
×
NEW
308
                outMD := grpcCtx.MD.Copy()
×
NEW
309
                // retrieve the address
×
NEW
310
                ethAddr := sp.GetSender().Hex()
×
NEW
311
                outMD.Set("user-address", ethAddr)
×
NEW
312
                outMD.Set("daemon-debug", "streamIntercept")
×
NEW
313
                // update the stored metadata in grpcCtx
×
NEW
314
                grpcCtx.MD = outMD
×
NEW
315

×
NEW
316
                // and update the context inside our WrapperServerStream
×
NEW
317
                if ws, ok := wrapperStream.(*WrapperServerStream); ok {
×
NEW
318
                        ws.Ctx = metadata.NewIncomingContext(ws.Ctx, outMD)
×
NEW
319
                }
×
320
        }
321

322
        defer func() {
10✔
323
                if r := recover(); r != nil {
6✔
324
                        zap.L().Warn("Service handler called panic(panicValue)", zap.Any("panicValue", r))
1✔
325
                        paymentHandler.CompleteAfterError(payment, fmt.Errorf("service handler called panic(%v)", r))
1✔
326
                        panic("re-panic after payment handler error handling")
1✔
327
                } else if e == nil {
6✔
328
                        err = paymentHandler.Complete(payment)
2✔
329
                        if err != nil {
3✔
330
                                // return err.Err()
1✔
331
                                e = err.Err()
1✔
332
                        }
1✔
333
                } else {
2✔
334
                        err = paymentHandler.CompleteAfterError(payment, e)
2✔
335
                        if err != nil {
3✔
336
                                // return err.Err()
1✔
337
                                e = err.Err()
1✔
338
                        }
1✔
339
                }
340
        }()
341

342
        zap.L().Debug("[streamIntercept] New payment received", zap.Any("payment", payment))
5✔
343

5✔
344
        e = handler(srv, wrapperStream)
5✔
345
        if e != nil {
7✔
346
                zap.L().Warn("[streamIntercept] gRPC handler returned error", zap.Error(e))
2✔
347
                return e
2✔
348
        }
2✔
349

350
        return nil
2✔
351
}
352

353
func getGrpcContext(
354
        serverStream grpc.ServerStream,
355
        info *grpc.StreamServerInfo,
356
) (*GrpcStreamContext, *GrpcError) {
6✔
357
        md, ok := metadata.FromIncomingContext(serverStream.Context())
6✔
358
        if !ok {
6✔
359
                zap.L().Error("Invalid metadata", zap.Any("info", info))
×
360
                return nil, NewGrpcError(codes.InvalidArgument, "missing metadata")
×
361
        }
×
362

363
        // 2) Make a copy of the metadata so that we can modify it
364
        mdCopy := md.Copy()
6✔
365

6✔
366
        // 3) Create a new context based on the original, but with our copy of the metadata
6✔
367
        newCtx := metadata.NewIncomingContext(serverStream.Context(), mdCopy)
6✔
368

6✔
369
        // 4) Wrap the original ServerStream so that Context() returns our newCtx
6✔
370
        wrappedStream := &WrapperServerStream{
6✔
371
                stream:           serverStream,
6✔
372
                recvMessage:      nil, // nil here because we haven’t called RecvMsg yet
6✔
373
                sendHeaderCalled: false,
6✔
374
                Ctx:              newCtx,
6✔
375
        }
6✔
376

6✔
377
        // 5) Return a GrpcStreamContext with the metadata copy and the wrapped stream
6✔
378
        return &GrpcStreamContext{
6✔
379
                MD:       mdCopy,
6✔
380
                Info:     info,
6✔
381
                InStream: wrappedStream,
6✔
382
        }, nil
6✔
383
}
384

385
func (interceptor *paymentValidationInterceptor) getPaymentHandler(context *GrpcStreamContext) (handler StreamPaymentHandler, err *GrpcError) {
6✔
386
        paymentTypeMd, ok := context.MD[PaymentTypeHeader]
6✔
387
        if !ok || len(paymentTypeMd) == 0 {
6✔
388
                zap.L().Debug("Payment type was not set by caller, return default payment handler",
×
389
                        zap.String("defaultPaymentHandlerType", interceptor.defaultPaymentHandler.Type()))
×
390
                return interceptor.defaultPaymentHandler, nil
×
391
        }
×
392

393
        paymentType := paymentTypeMd[0]
6✔
394
        zap.L().Debug("Payment metadata", zap.String("paymentType", paymentType), zap.Any("paymentTypeMd", paymentTypeMd))
6✔
395
        paymentHandler, ok := interceptor.paymentHandlers[paymentType]
6✔
396
        if !ok {
6✔
397
                zap.L().Error("Unexpected payment type", zap.String("paymentType", paymentType))
×
398
                return nil, NewGrpcErrorf(codes.InvalidArgument, "unexpected \"%v\", value: \"%v\"", PaymentTypeHeader, paymentType)
×
399
        }
×
400

401
        zap.L().Debug("Return payment handler by type", zap.Any("paymentType", paymentType))
6✔
402
        return paymentHandler, nil
6✔
403
}
404

405
// GetBigInt gets big.Int value from gRPC metadata
406
func GetBigInt(md metadata.MD, key string) (value *big.Int, err *GrpcError) {
31✔
407
        str, err := GetSingleValue(md, key)
31✔
408
        if err != nil {
36✔
409
                return
5✔
410
        }
5✔
411

412
        value = big.NewInt(0)
26✔
413
        e := value.UnmarshalText([]byte(str))
26✔
414
        if e != nil {
27✔
415
                return nil, NewGrpcErrorf(codes.InvalidArgument, "incorrect format \"%v\": \"%v\"", key, str)
1✔
416
        }
1✔
417

418
        return
25✔
419
}
420

421
// GetBytes gets bytes array value from gRPC metadata for key with '-bin'
422
// suffix, internally this data is encoded as base64
423
func GetBytes(md metadata.MD, key string) (result []byte, err *GrpcError) {
13✔
424
        if !strings.HasSuffix(key, "-bin") {
14✔
425
                return nil, NewGrpcErrorf(codes.InvalidArgument, "incorrect binary key name \"%v\"", key)
1✔
426
        }
1✔
427

428
        str, err := GetSingleValue(md, key)
12✔
429
        if err != nil {
13✔
430
                return
1✔
431
        }
1✔
432

433
        return []byte(str), nil
11✔
434
}
435

436
// GetBytesFromHex gets bytes array value from gRPC metadata, bytes array is
437
// encoded as hex string
438
func GetBytesFromHex(md metadata.MD, key string) (value []byte, err *GrpcError) {
4✔
439
        str, err := GetSingleValue(md, key)
4✔
440
        if err != nil {
6✔
441
                return
2✔
442
        }
2✔
443
        return common.FromHex(str), nil
2✔
444
}
445

446
// GetSingleValue gets string value from gRPC metadata
447
func GetSingleValue(md metadata.MD, key string) (value string, err *GrpcError) {
55✔
448
        array := md.Get(key)
55✔
449

55✔
450
        if len(array) == 0 {
65✔
451
                return "", NewGrpcErrorf(codes.InvalidArgument, "missing \"%v\"", key)
10✔
452
        }
10✔
453

454
        if len(array) > 1 {
47✔
455
                return "", NewGrpcErrorf(codes.InvalidArgument, "too many values for key \"%v\": %v", key, array)
2✔
456
        }
2✔
457

458
        return array[0], nil
43✔
459
}
460

461
// NoOpInterceptor is a gRPC interceptor which doesn't do payment checking.
462
func NoOpInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo,
463
        handler grpc.StreamHandler) error {
×
464
        return handler(srv, ss)
×
465
}
×
466

467
// set Additional details on the metrics persisted, this is to keep track of how many calls were made per channel
468
func setAdditionalDetails(context *GrpcStreamContext, stats *metrics.CommonStats) {
×
469
        md := context.MD
×
470
        if str, err := GetSingleValue(md, ClientTypeHeader); err == nil {
×
471
                stats.ClientType = str
×
472
        }
×
473
        if str, err := GetSingleValue(md, UserInfoHeader); err == nil {
×
474
                stats.UserDetails = str
×
475
        }
×
476
        if str, err := GetSingleValue(md, UserAgentHeader); err == nil {
×
477
                stats.UserAgent = str
×
478
        }
×
479
        if str, err := GetSingleValue(md, PaymentChannelIDHeader); err == nil {
×
480
                stats.ChannelId = str
×
481
        }
×
482
        if str, err := GetSingleValue(md, FreeCallUserIdHeader); err == nil {
×
483
                stats.UserName = str
×
484
        }
×
485
        if str, err := GetSingleValue(md, PaymentTypeHeader); err == nil {
×
486
                stats.PaymentMode = str
×
487
        }
×
488
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc