• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

singnet / snet-daemon / #655

14 Jul 2025 02:31PM UTC coverage: 36.221% (+2.4%) from 33.794%
#655

push

web-flow
Merge pull request #629 from singnet/master

update dev

0 of 3 new or added lines in 2 files covered. (0.0%)

2114 existing lines in 43 files now uncovered.

5581 of 15408 relevant lines covered (36.22%)

3.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.84
/handler/stream_interceptor.go
1
package handler
2

3
import (
4
        "fmt"
5
        "github.com/ethereum/go-ethereum/common"
6
        "github.com/singnet/snet-daemon/v6/blockchain"
7
        "github.com/singnet/snet-daemon/v6/config"
8
        "github.com/singnet/snet-daemon/v6/configuration_service"
9
        "github.com/singnet/snet-daemon/v6/metrics"
10
        "github.com/singnet/snet-daemon/v6/ratelimit"
11
        "go.uber.org/zap"
12

13
        "math/big"
14
        "strings"
15
        "time"
16

17
        "golang.org/x/time/rate"
18
        "google.golang.org/grpc"
19
        "google.golang.org/grpc/codes"
20
        "google.golang.org/grpc/metadata"
21
        "google.golang.org/grpc/status"
22
)
23

24
const (
25
        // PaymentTypeHeader is a type of payment used to pay for a RPC call.
26
        // Supported types are: "escrow".
27
        // Note: "job" Payment type is deprecated
28
        PaymentTypeHeader = "snet-payment-type"
29
        // Client that calls the Daemon ( example can be "snet-cli","snet-dapp","snet-sdk")
30
        ClientTypeHeader = "snet-client-type"
31
        // Value is a user address , example "0x94d04332C4f5273feF69c4a52D24f42a3aF1F207"
32
        UserInfoHeader = "snet-user-info"
33
        // User Agent details set in on the server stream info
34
        UserAgentHeader = "user-agent"
35
        // PaymentChannelIDHeader is a MultiPartyEscrow contract payment channel
36
        // id. Value is a string containing a decimal number.
37
        PaymentChannelIDHeader = "snet-payment-channel-id"
38
        // PaymentChannelNonceHeader is a payment channel nonce value. Value is a
39
        // string containing a decimal number.
40
        PaymentChannelNonceHeader = "snet-payment-channel-nonce"
41
        // PaymentChannelAmountHeader is an amount of payment channel value
42
        // which server is authorized to withdraw after handling the RPC call.
43
        // Value is a string containing a decimal number.
44
        PaymentChannelAmountHeader = "snet-payment-channel-amount"
45
        // PaymentChannelSignatureHeader is a signature of the client to confirm
46
        // amount withdrawing authorization. Value is an array of bytes.
47
        PaymentChannelSignatureHeader = "snet-payment-channel-signature-bin"
48
        // This is useful information in the header sent in by the client
49
        // All clients will have this information and they need this to Sign anyways
50
        // When Daemon is running in the block chain disabled mode , it would use this
51
        // header to get the MPE address. The goal here is to keep the client oblivious to the
52
        // Daemon block chain enabled or disabled mode and also standardize the signatures.
53
        // id. Value is a string containing a decimal number.
54
        PaymentMultiPartyEscrowAddressHeader = "snet-payment-mpe-address"
55

56
        //Added for free call support in Daemon
57

58
        //The user Id of the person making the call
59
        FreeCallUserIdHeader      = "snet-free-call-user-id"
60
        FreeCallUserAddressHeader = "snet-free-call-user-address"
61

62
        //Will be used to check if the Signature is still valid
63
        CurrentBlockNumberHeader = "snet-current-block-number"
64

65
        //Place holder to set the free call Auth Token issued
66
        FreeCallAuthTokenHeader = "snet-free-call-auth-token-bin"
67
        //Block number on when the Token was issued, to track the expiry of the token, which is ~ 1 Month
68
        //FreeCallAuthTokenExpiryBlockNumberHeader = "snet-free-call-token-expiry-block"
69

70
        //Users may decide to sign upfront and make calls .Daemon generates and Auth Token
71
        //Users/Clients will need to use this token to make calls for the amount signed upfront.
72
        PrePaidAuthTokenHeader = "snet-prepaid-auth-token-bin"
73

74
        DynamicPriceDerived = "snet-derived-dynamic-price-cost"
75

76
        TrainingModelId = "snet-train-model-id"
77
)
78

79
// GrpcStreamContext contains information about gRPC call which is used to
80
// validate payment and pricing.
81
type GrpcStreamContext struct {
82
        MD       metadata.MD
83
        Info     *grpc.StreamServerInfo
84
        InStream grpc.ServerStream
85
}
86

87
func (context *GrpcStreamContext) String() string {
×
88
        return fmt.Sprintf("{MD: %v, Info: %v", context.MD, *context.Info)
×
UNCOV
89
}
×
90

91
// Payment represents payment handler specific data which is validated
92
// and used to complete payment.
93
type Payment any
94

95
// Custom gRPC codes to return to the client
96
const (
97
        // IncorrectNonce is returned to client when payment received contains
98
        // incorrect nonce value. Client may use PaymentChannelStateService to get
99
        // latest channel state and correct nonce value.
100
        IncorrectNonce codes.Code = 1000
101
)
102

103
// GrpcError is an error which will be returned by interceptor via gRPC
104
// protocol. Part of information will be returned as header metadata.
105
type GrpcError struct {
106
        // Status is a gRPC call status
107
        Status *status.Status
108
}
109

110
func (err *GrpcError) Error() string {
1✔
111
        return err.String()
1✔
112
}
1✔
113

114
// Err returns error to return correct gRPC error to the caller
115
func (err *GrpcError) Err() error {
5✔
116
        if err.Status == nil {
5✔
UNCOV
117
                return nil
×
118
        }
×
119
        return err.Status.Err()
5✔
120
}
121

122
// String converts GrpcError to string
123
func (err *GrpcError) String() string {
1✔
124
        return fmt.Sprintf("{Status: %v}", err.Status)
1✔
125
}
1✔
126

127
// NewGrpcError returns new error which contains gRPC status with provided code
128
// and message
129
func NewGrpcError(code codes.Code, message string) *GrpcError {
11✔
130
        return &GrpcError{
11✔
131
                Status: status.New(code, message),
11✔
132
        }
11✔
133
}
11✔
134

135
// NewGrpcErrorf returns new error which contains gRPC status with provided
136
// code and message formed from format string and args.
137
func NewGrpcErrorf(code codes.Code, format string, args ...any) *GrpcError {
25✔
138
        if len(args) == 0 {
35✔
139
                return &GrpcError{
10✔
140
                        Status: status.New(code, format),
10✔
141
                }
10✔
142
        }
10✔
143
        return &GrpcError{
15✔
144
                Status: status.Newf(code, format, args...),
15✔
145
        }
15✔
146
}
147

148
// StreamPaymentHandler interface which is used by gRPC interceptor to get, validate
149
// and complete payment. There are two payment handler implementations so far:
150
// jobPaymentHandler and escrowPaymentHandler. jobPaymentHandler is deprecated.
151
type StreamPaymentHandler interface {
152
        // Type is a content of PaymentTypeHeader field which triggers usage of the
153
        // payment handler.
154
        Type() (typ string)
155
        // Payment extracts payment data from gRPC request context and checks
156
        // validity of payment data. It returns nil if data is valid or
157
        // appropriate gRPC status otherwise.
158
        Payment(context *GrpcStreamContext) (payment Payment, err *GrpcError)
159
        // Complete completes payment if gRPC call was successfully proceeded by
160
        // service.
161
        Complete(payment Payment) (err *GrpcError)
162
        // CompleteAfterError completes payment if service returns error.
163
        CompleteAfterError(payment Payment, result error) (err *GrpcError)
164
}
165

166
type rateLimitInterceptor struct {
167
        rateLimiter                   rate.Limiter
168
        messageBroadcaster            *configuration_service.MessageBroadcaster
169
        processRequest                int
170
        requestProcessingNotification chan int
171
}
172

UNCOV
173
func GrpcRateLimitInterceptor(broadcast *configuration_service.MessageBroadcaster) grpc.StreamServerInterceptor {
×
174
        interceptor := &rateLimitInterceptor{
×
175
                rateLimiter:                   *ratelimit.NewRateLimiter(),
×
176
                messageBroadcaster:            broadcast,
×
177
                processRequest:                configuration_service.START_PROCESSING_ANY_REQUEST,
×
UNCOV
178
                requestProcessingNotification: broadcast.NewSubscriber(),
×
UNCOV
179
        }
×
180
        go interceptor.startOrStopProcessingAnyRequests()
×
181
        return interceptor.intercept
×
182
}
×
183

UNCOV
184
func (interceptor *rateLimitInterceptor) startOrStopProcessingAnyRequests() {
×
185
        for {
×
186
                interceptor.processRequest = <-interceptor.requestProcessingNotification
×
187
        }
×
188
}
189

190
func GrpcMeteringInterceptor() grpc.StreamServerInterceptor {
×
191
        return interceptMetering
×
192
}
×
193

194
// Monitor requests arrived and responses sent and publish these stats for Reporting
195
func interceptMetering(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
196
        var (
×
197
                err   error
×
198
                start time.Time
×
199
        )
×
UNCOV
200
        start = time.Now()
×
201
        //Get the method name
×
202
        methodName, _ := grpc.MethodFromServerStream(ss)
×
203
        //Get the Context
×
204

×
205
        //Build common stats and use this to set request stats and response stats
×
206
        commonStats := metrics.BuildCommonStats(start, methodName)
×
207
        if context, err := getGrpcContext(ss, info); err == nil {
×
208
                setAdditionalDetails(context, commonStats)
×
209
        }
×
210

UNCOV
211
        defer func() {
×
212
                go metrics.PublishResponseStats(commonStats, time.Since(start), err)
×
213
        }()
×
214
        err = handler(srv, ss)
×
215
        if err != nil {
×
216
                zap.L().Error(err.Error())
×
217
                return err
×
218
        }
×
219
        return nil
×
220
}
221

222
func (interceptor *rateLimitInterceptor) intercept(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
×
223

×
224
        if interceptor.processRequest == configuration_service.STOP_PROCESING_ANY_REQUEST {
×
225
                return status.New(codes.Unavailable, "No requests are currently being processed, please try again later").Err()
×
226
        }
×
UNCOV
227
        if !interceptor.rateLimiter.Allow() {
×
UNCOV
228
                zap.L().Info("rate limit reached, too many requests to handle", zap.Any("rateLimiter.Burst()", interceptor.rateLimiter.Burst()))
×
UNCOV
229
                return status.New(codes.ResourceExhausted, "rate limiting , too many requests to handle").Err()
×
UNCOV
230
        }
×
UNCOV
231
        err := handler(srv, ss)
×
UNCOV
232
        if err != nil {
×
UNCOV
233
                zap.L().Error(err.Error())
×
UNCOV
234
                return err
×
UNCOV
235
        }
×
UNCOV
236
        return nil
×
237
}
238

239
// GrpcPaymentValidationInterceptor returns gRPC interceptor to validate payment. If
240
// blockchain is disabled then noOpInterceptor is returned.
241
func GrpcPaymentValidationInterceptor(serviceData *blockchain.ServiceMetadata, defaultPaymentHandler StreamPaymentHandler, paymentHandler ...StreamPaymentHandler) grpc.StreamServerInterceptor {
1✔
242
        interceptor := &paymentValidationInterceptor{
1✔
243
                defaultPaymentHandler: defaultPaymentHandler,
1✔
244
                paymentHandlers:       make(map[string]StreamPaymentHandler),
1✔
245
                serviceMetadata:       serviceData,
1✔
246
        }
1✔
247

1✔
248
        interceptor.paymentHandlers[defaultPaymentHandler.Type()] = defaultPaymentHandler
1✔
249
        zap.L().Info("Default payment handler registered", zap.Any("defaultPaymentType", defaultPaymentHandler.Type()))
1✔
250
        for _, handler := range paymentHandler {
2✔
251
                interceptor.paymentHandlers[handler.Type()] = handler
1✔
252
                zap.L().Info("Payment handler for type registered", zap.Any("paymentType", handler.Type()))
1✔
253
        }
1✔
254
        return interceptor.streamIntercept
1✔
255
}
256

257
type paymentValidationInterceptor struct {
258
        serviceMetadata       *blockchain.ServiceMetadata
259
        defaultPaymentHandler StreamPaymentHandler
260
        paymentHandlers       map[string]StreamPaymentHandler
261
}
262

263
func (interceptor *paymentValidationInterceptor) streamIntercept(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (e error) {
6✔
264
        var err *GrpcError
6✔
265
        wrapperStream := ss
6✔
266
        // check we need to have dynamic pricing here
6✔
267
        // if yes, then use the wrapper Stream
6✔
268
        if config.GetBool(config.EnableDynamicPricing) {
6✔
UNCOV
269
                var streamError error
×
UNCOV
270
                if wrapperStream, streamError = NewWrapperServerStream(ss); streamError != nil {
×
UNCOV
271
                        return streamError
×
UNCOV
272
                }
×
273
        }
274

275
        context, err := getGrpcContext(wrapperStream, info)
6✔
276
        if err != nil {
6✔
UNCOV
277
                return err.Err()
×
UNCOV
278
        }
×
279

280
        //zap.L().Debug("[streamIntercept] New gRPC call received", zap.Any("context", context))
281

282
        paymentHandler, err := interceptor.getPaymentHandler(context)
6✔
283
        if err != nil {
6✔
UNCOV
284
                return err.Err()
×
UNCOV
285
        }
×
286

287
        payment, err := paymentHandler.Payment(context)
6✔
288
        if err != nil {
7✔
289
                return err.Err()
1✔
290
        }
1✔
291

292
        defer func() {
10✔
293
                if r := recover(); r != nil {
6✔
294
                        zap.L().Warn("Service handler called panic(panicValue)", zap.Any("panicValue", r))
1✔
295
                        paymentHandler.CompleteAfterError(payment, fmt.Errorf("service handler called panic(%v)", r))
1✔
296
                        panic("re-panic after payment handler error handling")
1✔
297
                } else if e == nil {
6✔
298
                        err = paymentHandler.Complete(payment)
2✔
299
                        if err != nil {
3✔
300
                                // return err.Err()
1✔
301
                                e = err.Err()
1✔
302
                        }
1✔
303
                } else {
2✔
304
                        err = paymentHandler.CompleteAfterError(payment, e)
2✔
305
                        if err != nil {
3✔
306
                                // return err.Err()
1✔
307
                                e = err.Err()
1✔
308
                        }
1✔
309
                }
310
        }()
311

312
        zap.L().Debug("[streamIntercept] New payment received", zap.Any("payment", payment))
5✔
313

5✔
314
        e = handler(srv, wrapperStream)
5✔
315
        if e != nil {
7✔
316
                zap.L().Warn("[streamIntercept] gRPC handler returned error", zap.Error(e))
2✔
317
                return e
2✔
318
        }
2✔
319

320
        return nil
2✔
321
}
322

323
func getGrpcContext(serverStream grpc.ServerStream, info *grpc.StreamServerInfo) (context *GrpcStreamContext, err *GrpcError) {
6✔
324
        md, ok := metadata.FromIncomingContext(serverStream.Context())
6✔
325
        if !ok {
6✔
UNCOV
326
                zap.L().Error("Invalid metadata", zap.Any("info", info))
×
UNCOV
327
                return nil, NewGrpcError(codes.InvalidArgument, "missing metadata")
×
UNCOV
328
        }
×
329

330
        return &GrpcStreamContext{
6✔
331
                MD:       md,
6✔
332
                Info:     info,
6✔
333
                InStream: serverStream,
6✔
334
        }, nil
6✔
335
}
336

337
func (interceptor *paymentValidationInterceptor) getPaymentHandler(context *GrpcStreamContext) (handler StreamPaymentHandler, err *GrpcError) {
6✔
338
        paymentTypeMd, ok := context.MD[PaymentTypeHeader]
6✔
339
        if !ok || len(paymentTypeMd) == 0 {
6✔
340
                zap.L().Debug("Payment type was not set by caller, return default payment handler",
×
341
                        zap.String("defaultPaymentHandlerType", interceptor.defaultPaymentHandler.Type()))
×
UNCOV
342
                return interceptor.defaultPaymentHandler, nil
×
UNCOV
343
        }
×
344

345
        paymentType := paymentTypeMd[0]
6✔
346
        zap.L().Debug("Payment metadata", zap.String("paymentType", paymentType), zap.Any("paymentTypeMd", paymentTypeMd))
6✔
347
        paymentHandler, ok := interceptor.paymentHandlers[paymentType]
6✔
348
        if !ok {
6✔
UNCOV
349
                zap.L().Error("Unexpected payment type", zap.String("paymentType", paymentType))
×
UNCOV
350
                return nil, NewGrpcErrorf(codes.InvalidArgument, "unexpected \"%v\", value: \"%v\"", PaymentTypeHeader, paymentType)
×
UNCOV
351
        }
×
352

353
        zap.L().Debug("Return payment handler by type", zap.Any("paymentType", paymentType))
6✔
354
        return paymentHandler, nil
6✔
355
}
356

357
// GetBigInt gets big.Int value from gRPC metadata
358
func GetBigInt(md metadata.MD, key string) (value *big.Int, err *GrpcError) {
31✔
359
        str, err := GetSingleValue(md, key)
31✔
360
        if err != nil {
36✔
361
                return
5✔
362
        }
5✔
363

364
        value = big.NewInt(0)
26✔
365
        e := value.UnmarshalText([]byte(str))
26✔
366
        if e != nil {
27✔
367
                return nil, NewGrpcErrorf(codes.InvalidArgument, "incorrect format \"%v\": \"%v\"", key, str)
1✔
368
        }
1✔
369

370
        return
25✔
371
}
372

373
// GetBytes gets bytes array value from gRPC metadata for key with '-bin'
374
// suffix, internally this data is encoded as base64
375
func GetBytes(md metadata.MD, key string) (result []byte, err *GrpcError) {
13✔
376
        if !strings.HasSuffix(key, "-bin") {
14✔
377
                return nil, NewGrpcErrorf(codes.InvalidArgument, "incorrect binary key name \"%v\"", key)
1✔
378
        }
1✔
379

380
        str, err := GetSingleValue(md, key)
12✔
381
        if err != nil {
13✔
382
                return
1✔
383
        }
1✔
384

385
        return []byte(str), nil
11✔
386
}
387

388
// GetBytesFromHex gets bytes array value from gRPC metadata, bytes array is
389
// encoded as hex string
390
func GetBytesFromHex(md metadata.MD, key string) (value []byte, err *GrpcError) {
4✔
391
        str, err := GetSingleValue(md, key)
4✔
392
        if err != nil {
6✔
393
                return
2✔
394
        }
2✔
395
        return common.FromHex(str), nil
2✔
396
}
397

398
// GetSingleValue gets string value from gRPC metadata
399
func GetSingleValue(md metadata.MD, key string) (value string, err *GrpcError) {
55✔
400
        array := md.Get(key)
55✔
401

55✔
402
        if len(array) == 0 {
65✔
403
                return "", NewGrpcErrorf(codes.InvalidArgument, "missing \"%v\"", key)
10✔
404
        }
10✔
405

406
        if len(array) > 1 {
47✔
407
                return "", NewGrpcErrorf(codes.InvalidArgument, "too many values for key \"%v\": %v", key, array)
2✔
408
        }
2✔
409

410
        return array[0], nil
43✔
411
}
412

413
// NoOpInterceptor is a gRPC interceptor which doesn't do payment checking.
414
func NoOpInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo,
415
        handler grpc.StreamHandler) error {
×
416
        return handler(srv, ss)
×
417
}
×
418

419
// set Additional details on the metrics persisted, this is to keep track of how many calls were made per channel
420
func setAdditionalDetails(context *GrpcStreamContext, stats *metrics.CommonStats) {
×
421
        md := context.MD
×
422
        if str, err := GetSingleValue(md, ClientTypeHeader); err == nil {
×
423
                stats.ClientType = str
×
424
        }
×
425
        if str, err := GetSingleValue(md, UserInfoHeader); err == nil {
×
426
                stats.UserDetails = str
×
427
        }
×
428
        if str, err := GetSingleValue(md, UserAgentHeader); err == nil {
×
429
                stats.UserAgent = str
×
UNCOV
430
        }
×
UNCOV
431
        if str, err := GetSingleValue(md, PaymentChannelIDHeader); err == nil {
×
UNCOV
432
                stats.ChannelId = str
×
UNCOV
433
        }
×
UNCOV
434
        if str, err := GetSingleValue(md, FreeCallUserIdHeader); err == nil {
×
UNCOV
435
                stats.UserName = str
×
UNCOV
436
        }
×
UNCOV
437
        if str, err := GetSingleValue(md, PaymentTypeHeader); err == nil {
×
UNCOV
438
                stats.PaymentMode = str
×
UNCOV
439
        }
×
440
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc