• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bavix / gripmock / 17066567963

19 Aug 2025 10:15AM UTC coverage: 52.632% (-0.9%) from 53.531%
17066567963

push

github

web-flow
Merge pull request #653 from bavix/652-field-outputheaders-not-being-sent-as-trailers-if-outputcode-0

Fix Trailers

78 of 190 new or added lines in 8 files covered. (41.05%)

7 existing lines in 1 file now uncovered.

1590 of 3021 relevant lines covered (52.63%)

30.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.97
/internal/app/grpc_server.go
1
package app
2

3
//nolint:revive
4
import (
5
        "context"
6
        "encoding/base64"
7
        "fmt"
8
        "io"
9
        "reflect"
10
        "slices"
11
        "strings"
12
        "time"
13

14
        "github.com/cockroachdb/errors"
15
        "github.com/goccy/go-json"
16
        "github.com/gripmock/stuber"
17
        "github.com/gripmock/types"
18
        "github.com/rs/zerolog"
19
        "google.golang.org/grpc"
20
        "google.golang.org/grpc/codes"
21
        _ "google.golang.org/grpc/encoding/gzip"
22
        "google.golang.org/grpc/health"
23
        healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
24
        "google.golang.org/grpc/metadata"
25
        "google.golang.org/grpc/peer"
26
        "google.golang.org/grpc/reflection"
27
        "google.golang.org/grpc/status"
28
        "google.golang.org/protobuf/encoding/protojson"
29
        "google.golang.org/protobuf/proto"
30
        "google.golang.org/protobuf/reflect/protoreflect"
31
        "google.golang.org/protobuf/reflect/protoregistry"
32
        "google.golang.org/protobuf/types/descriptorpb"
33
        "google.golang.org/protobuf/types/dynamicpb"
34

35
        protoloc "github.com/bavix/gripmock/v3/internal/domain/proto"
36
        "github.com/bavix/gripmock/v3/internal/domain/protoset"
37
        "github.com/bavix/gripmock/v3/internal/infra/grpccontext"
38
)
39

40
// excludedHeaders contains headers that should be excluded from stub matching.
41
//
42
//nolint:gochecknoglobals
43
var excludedHeaders = []string{
44
        ":authority",
45
        "content-type",
46
        "grpc-accept-encoding",
47
        "user-agent",
48
        "accept-encoding",
49
}
50

51
// processHeaders converts metadata to headers map, excluding specified headers.
52
func processHeaders(md metadata.MD) map[string]any {
132✔
53
        if len(md) == 0 {
132✔
54
                return nil
×
55
        }
×
56

57
        headers := make(map[string]any)
132✔
58

132✔
59
        for k, v := range md {
666✔
60
                if !slices.Contains(excludedHeaders, k) {
540✔
61
                        headers[k] = strings.Join(v, ";")
6✔
62
                }
6✔
63
        }
64

65
        return headers
132✔
66
}
67

68
// sendStreamMessage sends a message on a gRPC stream with error handling.
69
func sendStreamMessage(stream grpc.ServerStream, msg *dynamicpb.Message) error {
33✔
70
        if err := stream.SendMsg(msg); err != nil {
33✔
71
                return errors.Wrap(err, "failed to send response")
×
72
        }
×
73

74
        return nil
33✔
75
}
76

77
// receiveStreamMessage receives a message from a gRPC stream with error handling.
78
func receiveStreamMessage(stream grpc.ServerStream, msg *dynamicpb.Message) error {
78✔
79
        err := stream.RecvMsg(msg)
78✔
80
        if err != nil {
97✔
81
                return errors.Wrap(err, "failed to receive message")
19✔
82
        }
19✔
83

84
        return nil
59✔
85
}
86

87
const serviceReflection = "grpc.reflection.v1.ServerReflection"
88

89
type GRPCServer struct {
90
        network     string
91
        address     string
92
        params      *protoloc.Arguments
93
        budgerigar  *stuber.Budgerigar
94
        waiter      Extender
95
        healthcheck *health.Server
96
}
97

98
type grpcMocker struct {
99
        budgerigar *stuber.Budgerigar
100

101
        inputDesc  protoreflect.MessageDescriptor
102
        outputDesc protoreflect.MessageDescriptor
103

104
        fullServiceName string
105
        serviceName     string
106
        methodName      string
107
        fullMethod      string
108

109
        serverStream bool
110
        clientStream bool
111
}
112

113
func (m *grpcMocker) streamHandler(srv any, stream grpc.ServerStream) error {
25✔
114
        info := &grpc.StreamServerInfo{
25✔
115
                FullMethod:     m.fullMethod,
25✔
116
                IsClientStream: m.clientStream,
25✔
117
                IsServerStream: m.serverStream,
25✔
118
        }
25✔
119

25✔
120
        handler := func(_ any, stream grpc.ServerStream) error {
50✔
121
                switch {
25✔
122
                case m.serverStream && !m.clientStream:
6✔
123
                        return m.handleServerStream(stream)
6✔
124
                case !m.serverStream && m.clientStream:
14✔
125
                        return m.handleClientStream(stream)
14✔
126
                case m.serverStream && m.clientStream:
5✔
127
                        return m.handleBidiStream(stream)
5✔
128
                default:
×
129
                        return status.Errorf(codes.Unimplemented, "Unknown stream type")
×
130
                }
131
        }
132

133
        return grpc.StreamServerInterceptor(func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
50✔
134
                return handler(srv, ss)
25✔
135
        })(srv, stream, info, handler)
25✔
136
}
137

138
func (m *grpcMocker) newQuery(ctx context.Context, msg *dynamicpb.Message) stuber.Query {
7✔
139
        query := stuber.Query{
7✔
140
                Service: m.fullServiceName,
7✔
141
                Method:  m.methodName,
7✔
142
                Data:    convertToMap(msg),
7✔
143
        }
7✔
144

7✔
145
        md, ok := metadata.FromIncomingContext(ctx)
7✔
146
        if ok {
14✔
147
                query.Headers = processHeaders(md)
7✔
148
        }
7✔
149

150
        return query
7✔
151
}
152

153
// newQueryV2 creates a new V2 query for improved performance.
154
func (m *grpcMocker) newQueryV2(ctx context.Context, msg *dynamicpb.Message) stuber.QueryV2 {
68✔
155
        query := stuber.QueryV2{
68✔
156
                Service: m.fullServiceName,
68✔
157
                Method:  m.methodName,
68✔
158
                Input:   []map[string]any{convertToMap(msg)},
68✔
159
        }
68✔
160

68✔
161
        md, ok := metadata.FromIncomingContext(ctx)
68✔
162
        if ok {
136✔
163
                query.Headers = processHeaders(md)
68✔
164
        }
68✔
165

166
        return query
68✔
167
}
168

169
// newQueryBidi creates a new bidirectional streaming query.
170
func (m *grpcMocker) newQueryBidi(ctx context.Context) stuber.QueryBidi {
5✔
171
        query := stuber.QueryBidi{
5✔
172
                Service: m.fullServiceName,
5✔
173
                Method:  m.methodName,
5✔
174
        }
5✔
175

5✔
176
        md, ok := metadata.FromIncomingContext(ctx)
5✔
177
        if ok {
10✔
178
                query.Headers = processHeaders(md)
5✔
179
        }
5✔
180

181
        return query
5✔
182
}
183

184
func convertToMap(msg proto.Message) map[string]any {
170✔
185
        if msg == nil {
170✔
186
                return nil
×
187
        }
×
188

189
        result := make(map[string]any)
170✔
190
        message := msg.ProtoReflect()
170✔
191

170✔
192
        message.Range(func(fd protoreflect.FieldDescriptor, value protoreflect.Value) bool {
567✔
193
                if !message.Has(fd) {
397✔
194
                        return true
×
195
                }
×
196

197
                fieldName := string(fd.Name())
397✔
198
                result[fieldName] = convertValue(fd, value)
397✔
199

397✔
200
                return true
397✔
201
        })
202

203
        return result
170✔
204
}
205

206
func convertValue(fd protoreflect.FieldDescriptor, value protoreflect.Value) any {
397✔
207
        switch {
397✔
208
        case fd.IsList():
10✔
209
                return convertList(fd, value.List())
10✔
210
        case fd.IsMap():
1✔
211
                return convertMap(fd, value.Map())
1✔
212
        default:
386✔
213
                return convertScalar(fd, value)
386✔
214
        }
215
}
216

217
func convertList(fd protoreflect.FieldDescriptor, list protoreflect.List) []any {
10✔
218
        result := make([]any, list.Len())
10✔
219
        elemType := fd.Message()
10✔
220

10✔
221
        for i := range list.Len() {
37✔
222
                elem := list.Get(i)
27✔
223

27✔
224
                if elemType != nil {
33✔
225
                        result[i] = convertToMap(elem.Message().Interface())
6✔
226
                } else {
27✔
227
                        result[i] = convertScalar(fd, elem)
21✔
228
                }
21✔
229
        }
230

231
        return result
10✔
232
}
233

234
func convertMap(fd protoreflect.FieldDescriptor, m protoreflect.Map) map[string]any {
1✔
235
        result := make(map[string]any)
1✔
236
        keyType := fd.MapKey()
1✔
237
        valType := fd.MapValue().Message()
1✔
238

1✔
239
        m.Range(func(key protoreflect.MapKey, val protoreflect.Value) bool {
4✔
240
                convertedKey, ok := convertScalar(keyType, key.Value()).(string)
3✔
241
                if !ok {
3✔
242
                        return true
×
243
                }
×
244

245
                if valType != nil {
3✔
246
                        result[convertedKey] = convertToMap(val.Message().Interface())
×
247
                } else {
3✔
248
                        result[convertedKey] = convertScalar(fd.MapValue(), val)
3✔
249
                }
3✔
250

251
                return true
3✔
252
        })
253

254
        return result
1✔
255
}
256

257
//nolint:cyclop
258
func convertScalar(fd protoreflect.FieldDescriptor, value protoreflect.Value) any {
413✔
259
        const nullValue = "google.protobuf.NullValue"
413✔
260

413✔
261
        switch fd.Kind() {
413✔
262
        case protoreflect.BoolKind:
×
263
                return value.Bool()
×
264
        case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind:
47✔
265
                return json.Number(value.String())
47✔
266
        case protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind:
15✔
267
                return json.Number(value.String())
15✔
268
        case protoreflect.Uint32Kind, protoreflect.Fixed32Kind:
×
269
                return json.Number(value.String())
×
270
        case protoreflect.Uint64Kind, protoreflect.Fixed64Kind:
×
271
                return json.Number(value.String())
×
272
        case protoreflect.FloatKind:
3✔
273
                return json.Number(value.String())
3✔
274
        case protoreflect.DoubleKind:
44✔
275
                return json.Number(value.String())
44✔
276
        case protoreflect.StringKind:
257✔
277
                return value.String()
257✔
278
        case protoreflect.BytesKind:
4✔
279
                return base64.StdEncoding.EncodeToString(value.Bytes())
4✔
280
        case protoreflect.EnumKind:
38✔
281
                if fd.Enum().FullName() == nullValue {
38✔
282
                        return nil
×
283
                }
×
284

285
                desc := fd.Enum().Values().ByNumber(value.Enum())
38✔
286
                if desc != nil {
76✔
287
                        return string(desc.Name())
38✔
288
                }
38✔
289

290
                return ""
×
291
        case protoreflect.MessageKind, protoreflect.GroupKind:
5✔
292
                return convertToMap(value.Message().Interface())
5✔
293
        default:
×
294
                return nil
×
295
        }
296
}
297

298
func (m *grpcMocker) delay(ctx context.Context, delayDur types.Duration) {
112✔
299
        if delayDur == 0 {
224✔
300
                return
112✔
301
        }
112✔
302

303
        timer := time.NewTimer(time.Duration(delayDur))
×
304
        defer timer.Stop()
×
305

×
306
        select {
×
307
        case <-ctx.Done():
×
308
                return
×
309
        case <-timer.C:
×
310
                return
×
311
        }
312
}
313

314
//nolint:nestif,cyclop,funlen
315
func (m *grpcMocker) handleServerStream(stream grpc.ServerStream) error {
6✔
316
        inputMsg := dynamicpb.NewMessage(m.inputDesc)
6✔
317

6✔
318
        err := stream.RecvMsg(inputMsg)
6✔
319
        if errors.Is(err, io.EOF) {
6✔
320
                return nil
×
321
        }
×
322

323
        if err != nil {
6✔
324
                return errors.Wrap(err, "failed to receive message")
×
325
        }
×
326

327
        query := m.newQuery(stream.Context(), inputMsg)
6✔
328

6✔
329
        result, err := m.budgerigar.FindByQuery(query)
6✔
330
        if err != nil {
6✔
331
                return errors.Wrap(err, "failed to find response")
×
332
        }
×
333

334
        found := result.Found()
6✔
335
        if found == nil {
6✔
336
                return status.Errorf(codes.NotFound, "No response found: %v", result.Similar())
×
337
        }
×
338

339
        // Process dynamic templates if the output contains them
340
        if found.Output.HasTemplates() {
7✔
341
                requestData := convertToMap(inputMsg)
1✔
342

1✔
343
                headers := make(map[string]any)
1✔
344
                if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
2✔
345
                        headers = processHeaders(md)
1✔
346
                }
1✔
347

348
                if err := found.Output.ProcessDynamicOutput(requestData, headers, 0, nil); err != nil {
1✔
349
                        return status.Error(codes.Internal, fmt.Sprintf("failed to process dynamic templates: %v", err))
×
350
                }
×
351
        }
352

353
        // Set headers once at the beginning
354
        if found.Output.Headers != nil {
6✔
355
                mdResp := make(metadata.MD, len(found.Output.Headers))
×
356
                for k, v := range found.Output.Headers {
×
357
                        mdResp.Append(k, strings.Split(v, ";")...)
×
358
                }
×
359

360
                if err := stream.SetHeader(mdResp); err != nil {
×
361
                        return errors.Wrap(err, "failed to set headers")
×
362
                }
×
363
        }
364

365
        // For server streaming, if Stream is not empty, send it first, then throw error if specified
366
        if found.IsServerStream() {
10✔
367
                if len(found.Output.Stream) > 0 {
8✔
368
                        if err := m.handleArrayStreamData(stream, found); err != nil {
4✔
369
                                return err
×
370
                        }
×
371

372
                        // After sending the stream, if output.error is set, return it now
373
                        if err := m.handleOutputError(stream.Context(), stream, found.Output); err != nil { //nolint:wrapcheck
4✔
374
                                return err
×
375
                        }
×
376

377
                        return nil
4✔
378
                }
379

380
                // If stream is empty and error is specified, return it immediately
NEW
381
                if err := m.handleOutputError(stream.Context(), stream, found.Output); err != nil { //nolint:wrapcheck
×
382
                        return err
×
383
                }
×
384

385
                // Fallback: no stream and no error – treat as single message
386
                return m.handleNonArrayStreamData(stream, found)
×
387
        }
388

389
        // Fallback to Data for single message streaming
390
        return m.handleNonArrayStreamData(stream, found)
2✔
391
}
392

393
func (m *grpcMocker) handleArrayStreamData(stream grpc.ServerStream, found *stuber.Stub) error {
4✔
394
        // Store context done channel outside the loop for clarity; context.Done() is already cached
4✔
395
        done := stream.Context().Done()
4✔
396

4✔
397
        // Send all messages, validating each element incrementally
4✔
398
        for i, streamData := range found.Output.Stream {
18✔
399
                select {
14✔
400
                case <-done:
×
401
                        return stream.Context().Err()
×
402
                default:
14✔
403
                }
404

405
                // Validate type of each streamData just before sending
406
                outputData, ok := streamData.(map[string]any)
14✔
407
                if !ok {
14✔
408
                        return status.Errorf(
×
409
                                codes.Internal,
×
410
                                "invalid data format in stream array at index %d: got %T, expected map[string]any",
×
411
                                i, streamData,
×
412
                        )
×
413
                }
×
414

415
                // Apply delay before sending each message
416
                m.delay(stream.Context(), found.Output.Delay)
14✔
417

14✔
418
                outputMsg, err := m.newOutputMessage(outputData)
14✔
419
                if err != nil {
14✔
420
                        return errors.Wrap(err, "failed to convert response to dynamic message")
×
421
                }
×
422

423
                if err := sendStreamMessage(stream, outputMsg); err != nil {
14✔
424
                        return err
×
425
                }
×
426
        }
427

428
        return nil
4✔
429
}
430

431
func (m *grpcMocker) handleNonArrayStreamData(stream grpc.ServerStream, found *stuber.Stub) error {
2✔
432
        // Original behavior for non-array data, with context cancellation check
2✔
433
        done := stream.Context().Done()
2✔
434

2✔
435
        for {
4✔
436
                select {
2✔
437
                case <-done:
×
438
                        return stream.Context().Err()
×
439
                default:
2✔
440
                }
441

442
                m.delay(stream.Context(), found.Output.Delay)
2✔
443

2✔
444
                outputMsg, err := m.newOutputMessage(found.Output.Data)
2✔
445
                if err != nil {
2✔
446
                        return errors.Wrap(err, "failed to convert response to dynamic message")
×
447
                }
×
448

449
                if err := sendStreamMessage(stream, outputMsg); err != nil {
2✔
450
                        return err //nolint:wrapcheck
×
451
                }
×
452

453
                // In server streaming, do not receive further messages from the client after the initial request.
454
                // The server should only send messages to the client.
455
                // Check for EOF to determine if client has closed the stream
456
                if err := stream.RecvMsg(nil); err != nil {
4✔
457
                        if errors.Is(err, io.EOF) {
4✔
458
                                return nil
2✔
459
                        }
2✔
460

461
                        return errors.Wrap(err, "failed to receive message")
×
462
                }
463
        }
464
}
465

466
func (m *grpcMocker) newOutputMessage(data map[string]any) (*dynamicpb.Message, error) {
104✔
467
        jsonData, err := json.Marshal(data)
104✔
468
        if err != nil {
104✔
469
                return nil, fmt.Errorf("failed to marshal map to JSON: %w", err)
×
470
        }
×
471

472
        msg := dynamicpb.NewMessage(m.outputDesc)
104✔
473

104✔
474
        err = protojson.Unmarshal(jsonData, msg)
104✔
475
        if err != nil {
104✔
476
                return nil, fmt.Errorf("failed to unmarshal JSON into dynamic message: %w", err)
×
477
        }
×
478

479
        return msg, nil
104✔
480
}
481

482
func (m *grpcMocker) unaryHandler() grpc.MethodHandler {
42✔
483
        return func(srv any, ctx context.Context, dec func(any) error, interceptor grpc.UnaryServerInterceptor) (any, error) {
110✔
484
                req := dynamicpb.NewMessage(m.inputDesc)
68✔
485
                if err := dec(req); err != nil {
68✔
486
                        return nil, err //nolint:wrapcheck
×
487
                }
×
488

489
                if interceptor != nil {
136✔
490
                        return interceptor(ctx, req, &grpc.UnaryServerInfo{
68✔
491
                                Server:     srv,
68✔
492
                                FullMethod: m.fullMethod,
68✔
493
                        }, func(ctx context.Context, req any) (any, error) {
136✔
494
                                if msg, ok := req.(*dynamicpb.Message); ok {
136✔
495
                                        return m.handleUnary(ctx, msg)
68✔
496
                                }
68✔
497

498
                                return nil, status.Errorf(codes.InvalidArgument, "expected *dynamicpb.Message, got %T", req)
×
499
                        })
500
                }
501

502
                return m.handleUnary(ctx, req)
×
503
        }
504
}
505

506
//nolint:cyclop
507
func (m *grpcMocker) handleUnary(ctx context.Context, req *dynamicpb.Message) (*dynamicpb.Message, error) {
68✔
508
        // Try V2 API first for better performance
68✔
509
        queryV2 := m.newQueryV2(ctx, req)
68✔
510

68✔
511
        result, err := m.budgerigar.FindByQueryV2(queryV2)
68✔
512
        if err != nil {
69✔
513
                // Fallback to V1 API for backward compatibility
1✔
514
                query := m.newQuery(ctx, req)
1✔
515

1✔
516
                result, err = m.budgerigar.FindByQuery(query)
1✔
517
                if err != nil {
1✔
518
                        return nil, err //nolint:wrapcheck
×
519
                }
×
520
        }
521

522
        found := result.Found()
68✔
523
        if found == nil {
69✔
524
                // Use appropriate error function based on which API was used
1✔
525
                if queryV2.Service != "" {
2✔
526
                        errorFormatter := NewErrorFormatter()
1✔
527

1✔
528
                        return nil, status.Error(codes.NotFound, errorFormatter.FormatStubNotFoundErrorV2(queryV2, result).Error())
1✔
529
                }
1✔
530

531
                // Fallback to V1 error format
532
                query := m.newQuery(ctx, req)
×
533

×
534
                return nil, status.Error(codes.NotFound, stubNotFoundError(query, result).Error())
×
535
        }
536

537
        m.delay(ctx, found.Output.Delay)
67✔
538

67✔
539
        // Process dynamic templates on a deep copy to avoid mutating the stub
67✔
540
        outputToUse := found.Output
67✔
541
        if found.Output.HasTemplates() || found.Output.Error != "" {
83✔
542
                requestData := convertToMap(req)
16✔
543

16✔
544
                headers := make(map[string]any)
16✔
545
                if md, ok := metadata.FromIncomingContext(ctx); ok {
32✔
546
                        headers = processHeaders(md)
16✔
547
                }
16✔
548

549
                // deep copy Output fields
550
                outputToUse.Data = deepCopyMapAny(found.Output.Data)
16✔
551
                outputToUse.Stream = deepCopySliceAny(found.Output.Stream)
16✔
552
                outputToUse.Headers = deepCopyStringMap(found.Output.Headers)
16✔
553

16✔
554
                if err := outputToUse.ProcessDynamicOutput(requestData, headers, 0, nil); err != nil {
16✔
555
                        return nil, status.Error(codes.Internal, fmt.Sprintf("failed to process dynamic templates: %v", err))
×
556
                }
×
557
        }
558

559
        // Always send headers first (both for success and error cases)
560
        if err := m.setResponseHeadersAny(ctx, nil, outputToUse.Headers); err != nil {
67✔
UNCOV
561
                return nil, err //nolint:wrapcheck
×
UNCOV
562
        }
×
563

564
        if err := m.handleOutputError(ctx, nil, outputToUse); err != nil {
76✔
565
                return nil, err //nolint:wrapcheck
9✔
566
        }
9✔
567

568
        outputMsg, err := m.newOutputMessage(outputToUse.Data)
58✔
569
        if err != nil {
58✔
570
                return nil, err //nolint:wrapcheck
×
571
        }
×
572

573
        return outputMsg, nil
58✔
574
}
575

576
// buildResponseMetadata builds gRPC metadata from headers map.
577
func buildResponseMetadata(headers map[string]string) (metadata.MD, bool) {
87✔
578
        if len(headers) == 0 {
174✔
579
                return nil, false
87✔
580
        }
87✔
581

NEW
582
        mdResp := make(metadata.MD, len(headers))
×
NEW
583
        for k, v := range headers {
×
NEW
584
                mdResp.Append(k, strings.Split(v, ";")...)
×
UNCOV
585
        }
×
586

NEW
587
        return mdResp, true
×
588
}
589

590
// setResponseHeadersAny sets headers for success responses.
591
func (m *grpcMocker) setResponseHeadersAny(ctx context.Context, stream grpc.ServerStream, headers map[string]string) error {
87✔
592
        mdResp, ok := buildResponseMetadata(headers)
87✔
593
        if !ok {
174✔
594
                return nil
87✔
595
        }
87✔
596

NEW
597
        if stream != nil {
×
NEW
598
                return stream.SetHeader(mdResp)
×
UNCOV
599
        }
×
600

601
        return grpc.SetHeader(ctx, mdResp)
×
602
}
603

604
func (m *grpcMocker) handleOutputError(_ context.Context, _ grpc.ServerStream, output stuber.Output) error {
100✔
605
        if output.Error != "" || output.Code != nil {
111✔
606
                if output.Code == nil {
11✔
NEW
607
                        return status.Error(codes.Aborted, output.Error)
×
NEW
608
                }
×
609

610
                if *output.Code != codes.OK {
21✔
611
                        return status.Error(*output.Code, output.Error)
10✔
612
                }
10✔
613
        }
614

615
        return nil
90✔
616
}
617

618
// tryV2API attempts to find a stub using V2 API.
619
func (m *grpcMocker) tryV2API(messages []map[string]any, md metadata.MD) (*stuber.Result, error) {
14✔
620
        queryV2 := stuber.QueryV2{
14✔
621
                Service: m.fullServiceName,
14✔
622
                Method:  m.methodName,
14✔
623
                Input:   messages,
14✔
624
        }
14✔
625

14✔
626
        // Add headers to V2 query
14✔
627
        if len(md) > 0 {
28✔
628
                queryV2.Headers = processHeaders(md)
14✔
629
        }
14✔
630

631
        return m.budgerigar.FindByQueryV2(queryV2)
14✔
632
}
633

634
// tryV1APIFallback attempts to find a stub using V1 API as fallback.
635
func (m *grpcMocker) tryV1APIFallback(messages []map[string]any, md metadata.MD) (*stuber.Result, error) {
3✔
636
        // Try each message individually (from last to first for better matching) using V1 API
3✔
637
        for i := len(messages) - 1; i >= 0; i-- {
6✔
638
                message := messages[i]
3✔
639

3✔
640
                query := stuber.Query{
3✔
641
                        Service: m.fullServiceName,
3✔
642
                        Method:  m.methodName,
3✔
643
                        Data:    message,
3✔
644
                }
3✔
645

3✔
646
                // Add headers to V1 query
3✔
647
                if len(md) > 0 {
6✔
648
                        query.Headers = processHeaders(md)
3✔
649
                }
3✔
650

651
                result, foundErr := m.budgerigar.FindByQuery(query)
3✔
652
                if foundErr == nil && result != nil && result.Found() != nil {
6✔
653
                        return result, nil
3✔
654
                }
3✔
655
        }
656

657
        return nil, status.Errorf(codes.NotFound, "failed to find response for client stream")
×
658
}
659

660
func (m *grpcMocker) handleClientStream(stream grpc.ServerStream) error {
14✔
661
        // Collect all messages from client
14✔
662
        messages, err := m.collectClientMessages(stream)
14✔
663
        if err != nil {
14✔
664
                return err
×
665
        }
×
666

667
        // Try to find stub
668
        found, err := m.tryFindStub(stream, messages)
14✔
669
        if err != nil {
14✔
670
                return err
×
671
        }
×
672

673
        // Send response
674
        return m.sendClientStreamResponse(stream, found, messages)
14✔
675
}
676

677
// collectClientMessages collects all messages from the client stream.
678
func (m *grpcMocker) collectClientMessages(stream grpc.ServerStream) ([]map[string]any, error) {
14✔
679
        var messages []map[string]any
14✔
680

14✔
681
        for {
72✔
682
                inputMsg := dynamicpb.NewMessage(m.inputDesc)
58✔
683

58✔
684
                err := receiveStreamMessage(stream, inputMsg)
58✔
685
                if errors.Is(err, io.EOF) {
72✔
686
                        break
14✔
687
                }
688

689
                if err != nil {
44✔
690
                        return nil, err //nolint:wrapcheck
×
691
                }
×
692

693
                messages = append(messages, convertToMap(inputMsg))
44✔
694
        }
695

696
        return messages, nil
14✔
697
}
698

699
// tryFindStub attempts to find a matching stub using V2 API first, then falls back to V1 API.
700
func (m *grpcMocker) tryFindStub(stream grpc.ServerStream, messages []map[string]any) (*stuber.Stub, error) {
14✔
701
        // Add headers
14✔
702
        md, _ := metadata.FromIncomingContext(stream.Context())
14✔
703

14✔
704
        // Try V2 API first
14✔
705
        result, foundErr := m.tryV2API(messages, md)
14✔
706

14✔
707
        // If V2 API fails, try V1 API for backward compatibility
14✔
708
        if foundErr != nil || result == nil || result.Found() == nil {
17✔
709
                result, foundErr = m.tryV1APIFallback(messages, md)
3✔
710
        }
3✔
711

712
        if foundErr != nil || result == nil || result.Found() == nil {
14✔
713
                // Return an error message with service and method context to aid debugging
×
714
                errorMsg := fmt.Sprintf("Failed to find response for client stream (service: %s, method: %s)", m.serviceName, m.methodName)
×
715
                if foundErr != nil {
×
716
                        errorMsg += fmt.Sprintf(" - Error: %v", foundErr)
×
717
                }
×
718

719
                return nil, status.Error(codes.NotFound, errorMsg)
×
720
        }
721

722
        found := result.Found()
14✔
723
        if found == nil {
14✔
724
                return nil, status.Errorf(codes.NotFound, "No response found for client stream: %v", result.Similar())
×
725
        }
×
726

727
        return found, nil
14✔
728
}
729

730
// sendClientStreamResponse sends the response for client streaming.
731
//
732
//nolint:nestif,cyclop
733
func (m *grpcMocker) sendClientStreamResponse(stream grpc.ServerStream, found *stuber.Stub, messages []map[string]any) error {
14✔
734
        m.delay(stream.Context(), found.Output.Delay)
14✔
735

14✔
736
        // Process dynamic templates if the output contains them
14✔
737
        if found.Output.HasTemplates() {
24✔
738
                // For client streaming, we can use different strategies:
10✔
739
                // 1. Use the last message (most common case)
10✔
740
                // 2. Use all messages as an array
10✔
741
                // 3. Use a combined approach
10✔
742
                var requestData map[string]any
10✔
743

10✔
744
                if len(messages) > 0 {
20✔
745
                        // Use the last non-empty message as primary data
10✔
746
                        for i := len(messages) - 1; i >= 0; i-- {
20✔
747
                                if len(messages[i]) > 0 {
20✔
748
                                        requestData = messages[i]
10✔
749

10✔
750
                                        break
10✔
751
                                }
752
                        }
753

754
                        if requestData == nil {
10✔
755
                                requestData = make(map[string]any)
×
756
                        }
×
757
                } else {
×
758
                        requestData = make(map[string]any)
×
759
                }
×
760

761
                headers := make(map[string]any)
10✔
762
                if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
20✔
763
                        headers = processHeaders(md)
10✔
764
                }
10✔
765

766
                // Provide only non-empty client messages to template context via .Requests
767
                allMessages := make([]any, 0, len(messages))
10✔
768
                for _, m := range messages {
37✔
769
                        if len(m) == 0 {
27✔
770
                                continue
×
771
                        }
772

773
                        allMessages = append(allMessages, m)
27✔
774
                }
775

776
                if err := found.Output.ProcessDynamicOutput(requestData, headers, 0, allMessages); err != nil {
10✔
777
                        return status.Error(codes.Internal, fmt.Sprintf("failed to process dynamic templates: %v", err))
×
778
                }
×
779
        }
780

781
        // Handle headers
782
        // If the output specifies an error, return it instead of sending a message
783
        if err := m.handleOutputError(stream.Context(), stream, found.Output); err != nil { //nolint:wrapcheck
15✔
784
                return err
1✔
785
        }
1✔
786

787
        if err := m.setResponseHeadersAny(stream.Context(), stream, found.Output.Headers); err != nil {
13✔
788
                return errors.Wrap(err, "failed to set headers")
×
789
        }
×
790

791
        // Send response
792
        outputMsg, err := m.newOutputMessage(found.Output.Data)
13✔
793
        if err != nil {
13✔
794
                return errors.Wrap(err, "failed to convert response to dynamic message")
×
795
        }
×
796

797
        return stream.SendMsg(outputMsg)
13✔
798
}
799

800
//nolint:cyclop
801
func (m *grpcMocker) handleBidiStream(stream grpc.ServerStream) error {
5✔
802
        // Initialize bidirectional streaming session
5✔
803
        queryBidi := m.newQueryBidi(stream.Context())
5✔
804

5✔
805
        bidiResult, err := m.budgerigar.FindByQueryBidi(queryBidi)
5✔
806
        if err != nil {
5✔
807
                return errors.Wrapf(err, "failed to initialize bidirectional streaming session: %v", err)
×
808
        }
×
809

810
        for {
25✔
811
                inputMsg := dynamicpb.NewMessage(m.inputDesc)
20✔
812

20✔
813
                err := receiveStreamMessage(stream, inputMsg)
20✔
814
                if errors.Is(err, io.EOF) {
25✔
815
                        return nil
5✔
816
                }
5✔
817

818
                if err != nil {
15✔
819
                        return err //nolint:wrapcheck
×
820
                }
×
821

822
                // Process message through bidirectional streaming
823
                stub, err := bidiResult.Next(convertToMap(inputMsg))
15✔
824
                if err != nil {
15✔
825
                        return errors.Wrap(err, "failed to process bidirectional message")
×
826
                }
×
827

828
                m.delay(stream.Context(), stub.Output.Delay)
15✔
829

15✔
830
                // Make a deep copy of the output and process dynamic templates per message
15✔
831
                outputToUse := stub.Output
15✔
832
                if stub.Output.HasTemplates() {
23✔
833
                        requestData := convertToMap(inputMsg)
8✔
834
                        // Add message index for bidirectional streaming
8✔
835
                        requestData["_message_index"] = bidiResult.GetMessageIndex()
8✔
836

8✔
837
                        headers := make(map[string]any)
8✔
838
                        if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
16✔
839
                                headers = processHeaders(md)
8✔
840
                        }
8✔
841

842
                        // Deep copy Output fields so each message is rendered independently
843
                        outputToUse.Data = deepCopyMapAny(stub.Output.Data)
8✔
844
                        outputToUse.Stream = deepCopySliceAny(stub.Output.Stream)
8✔
845
                        outputToUse.Headers = deepCopyStringMap(stub.Output.Headers)
8✔
846

8✔
847
                        if err := outputToUse.ProcessDynamicOutput(requestData, headers, bidiResult.GetMessageIndex(), nil); err != nil {
8✔
848
                                return status.Error(codes.Internal, fmt.Sprintf("failed to process dynamic templates: %v", err))
×
849
                        }
×
850
                }
851

852
                // Send headers only once at the beginning of the stream
853
                if bidiResult.GetMessageIndex() == 0 {
22✔
854
                        if err := m.setResponseHeadersAny(stream.Context(), stream, outputToUse.Headers); err != nil {
7✔
NEW
855
                                return errors.Wrap(err, "failed to set headers")
×
UNCOV
856
                        }
×
857
                }
858

859
                // If the output specifies an error, return it instead of sending a message
860
                if err := m.handleOutputError(stream.Context(), stream, outputToUse); err != nil { //nolint:wrapcheck
15✔
NEW
861
                        return err
×
UNCOV
862
                }
×
863

864
                // Send response(s) based on output configuration
865
                if err := m.sendBidiResponses(stream, outputToUse, stub, bidiResult.GetMessageIndex()); err != nil {
15✔
NEW
866
                        return err
×
UNCOV
867
                }
×
868
        }
869
}
870

871
func NewGRPCServer(
872
        network, address string,
873
        params *protoloc.Arguments,
874
        budgerigar *stuber.Budgerigar,
875
        waiter Extender,
876
) *GRPCServer {
1✔
877
        return &GRPCServer{
1✔
878
                network:    network,
1✔
879
                address:    address,
1✔
880
                params:     params,
1✔
881
                budgerigar: budgerigar,
1✔
882
                waiter:     waiter,
1✔
883
        }
1✔
884
}
1✔
885

886
func (s *GRPCServer) Build(ctx context.Context) (*grpc.Server, error) {
1✔
887
        descriptors, err := protoset.Build(ctx, s.params.Imports(), s.params.ProtoPath())
1✔
888
        if err != nil {
1✔
889
                return nil, errors.Wrap(err, "failed to build descriptors")
×
890
        }
×
891

892
        server := s.createServer(ctx)
1✔
893
        s.setupHealthCheck(server)
1✔
894
        s.registerServices(ctx, server, descriptors)
1✔
895
        s.startHealthCheckRoutine(ctx)
1✔
896

1✔
897
        return server, nil
1✔
898
}
899

900
func (s *GRPCServer) createServer(ctx context.Context) *grpc.Server {
1✔
901
        logger := zerolog.Ctx(ctx)
1✔
902

1✔
903
        return grpc.NewServer(
1✔
904
                grpc.ChainUnaryInterceptor(
1✔
905
                        grpccontext.PanicRecoveryUnaryInterceptor,
1✔
906
                        grpccontext.UnaryInterceptor(logger),
1✔
907
                        LogUnaryInterceptor,
1✔
908
                ),
1✔
909
                grpc.ChainStreamInterceptor(
1✔
910
                        grpccontext.PanicRecoveryStreamInterceptor,
1✔
911
                        grpccontext.StreamInterceptor(logger),
1✔
912
                        LogStreamInterceptor,
1✔
913
                ),
1✔
914
        )
1✔
915
}
1✔
916

917
func (s *GRPCServer) setupHealthCheck(server *grpc.Server) {
1✔
918
        healthcheck := health.NewServer()
1✔
919
        healthcheck.SetServingStatus("gripmock", healthgrpc.HealthCheckResponse_NOT_SERVING)
1✔
920
        healthgrpc.RegisterHealthServer(server, healthcheck)
1✔
921
        reflection.Register(server)
1✔
922

1✔
923
        // Store healthcheck server for later status updates
1✔
924
        s.healthcheck = healthcheck
1✔
925
}
1✔
926

927
func (s *GRPCServer) registerServices(ctx context.Context, server *grpc.Server, descriptors []*descriptorpb.FileDescriptorSet) {
1✔
928
        logger := zerolog.Ctx(ctx)
1✔
929

1✔
930
        for _, descriptor := range descriptors {
3✔
931
                for _, file := range descriptor.GetFile() {
42✔
932
                        for _, svc := range file.GetService() {
80✔
933
                                serviceDesc := s.createServiceDesc(file, svc)
40✔
934
                                s.registerServiceMethods(ctx, &serviceDesc, svc)
40✔
935
                                server.RegisterService(&serviceDesc, nil)
40✔
936
                                logger.Info().Str("service", serviceDesc.ServiceName).Msg("Registered gRPC service")
40✔
937
                        }
40✔
938
                }
939
        }
940
}
941

942
func (s *GRPCServer) createServiceDesc(file *descriptorpb.FileDescriptorProto, svc *descriptorpb.ServiceDescriptorProto) grpc.ServiceDesc {
40✔
943
        return grpc.ServiceDesc{
40✔
944
                ServiceName: getServiceName(file, svc),
40✔
945
                HandlerType: (*any)(nil),
40✔
946
        }
40✔
947
}
40✔
948

949
func (s *GRPCServer) registerServiceMethods(ctx context.Context, serviceDesc *grpc.ServiceDesc, svc *descriptorpb.ServiceDescriptorProto) {
40✔
950
        logger := zerolog.Ctx(ctx)
40✔
951

40✔
952
        for _, method := range svc.GetMethod() {
98✔
953
                inputDesc, err := getMessageDescriptor(method.GetInputType())
58✔
954
                if err != nil {
58✔
955
                        logger.Fatal().Err(err).Msg("Failed to get input message descriptor")
×
956
                }
×
957

958
                outputDesc, err := getMessageDescriptor(method.GetOutputType())
58✔
959
                if err != nil {
58✔
960
                        logger.Fatal().Err(err).Msg("Failed to get output message descriptor")
×
961
                }
×
962

963
                m := s.createGrpcMocker(serviceDesc, svc, method, inputDesc, outputDesc)
58✔
964

58✔
965
                if method.GetServerStreaming() || method.GetClientStreaming() {
74✔
966
                        serviceDesc.Streams = append(serviceDesc.Streams, grpc.StreamDesc{
16✔
967
                                StreamName:    method.GetName(),
16✔
968
                                Handler:       m.streamHandler,
16✔
969
                                ServerStreams: m.serverStream,
16✔
970
                                ClientStreams: m.clientStream,
16✔
971
                        })
16✔
972
                } else {
58✔
973
                        serviceDesc.Methods = append(serviceDesc.Methods, grpc.MethodDesc{
42✔
974
                                MethodName: method.GetName(),
42✔
975
                                Handler:    m.unaryHandler(),
42✔
976
                        })
42✔
977
                }
42✔
978
        }
979
}
980

981
func (s *GRPCServer) createGrpcMocker(
982
        serviceDesc *grpc.ServiceDesc,
983
        svc *descriptorpb.ServiceDescriptorProto,
984
        method *descriptorpb.MethodDescriptorProto,
985
        inputDesc, outputDesc protoreflect.MessageDescriptor,
986
) *grpcMocker {
58✔
987
        return &grpcMocker{
58✔
988
                budgerigar: s.budgerigar,
58✔
989

58✔
990
                inputDesc:  inputDesc,
58✔
991
                outputDesc: outputDesc,
58✔
992

58✔
993
                fullServiceName: serviceDesc.ServiceName,
58✔
994
                serviceName:     svc.GetName(),
58✔
995
                methodName:      method.GetName(),
58✔
996
                fullMethod:      fmt.Sprintf("/%s/%s", serviceDesc.ServiceName, method.GetName()),
58✔
997

58✔
998
                serverStream: method.GetServerStreaming(),
58✔
999
                clientStream: method.GetClientStreaming(),
58✔
1000
        }
58✔
1001
}
58✔
1002

1003
func (s *GRPCServer) startHealthCheckRoutine(ctx context.Context) {
1✔
1004
        logger := zerolog.Ctx(ctx)
1✔
1005

1✔
1006
        go func() {
2✔
1007
                defer func() {
2✔
1008
                        if r := recover(); r != nil {
1✔
NEW
1009
                                logger.Error().
×
NEW
1010
                                        Interface("panic", r).
×
NEW
1011
                                        Msg("Panic recovered in health check routine")
×
NEW
1012
                        }
×
1013
                }()
1014

1015
                s.waiter.Wait(ctx)
1✔
1016

1✔
1017
                select {
1✔
1018
                case <-ctx.Done():
×
1019
                        return
×
1020
                default:
1✔
1021
                        logger.Info().Msg("gRPC server is ready to accept requests")
1✔
1022
                        s.healthcheck.SetServingStatus("gripmock", healthgrpc.HealthCheckResponse_SERVING)
1✔
1023
                }
1024
        }()
1025
}
1026

1027
// getServiceName constructs the fully qualified service name by combining the package name
1028
// and the service name. If the package name is empty, it returns only the service name,
1029
// avoiding a leading dot in the result.
1030
func getServiceName(file *descriptorpb.FileDescriptorProto, svc *descriptorpb.ServiceDescriptorProto) string {
40✔
1031
        if file.GetPackage() != "" {
78✔
1032
                return fmt.Sprintf("%s.%s", file.GetPackage(), svc.GetName())
38✔
1033
        }
38✔
1034

1035
        return svc.GetName()
2✔
1036
}
1037

1038
//nolint:ireturn
1039
func getMessageDescriptor(messageType string) (protoreflect.MessageDescriptor, error) {
116✔
1040
        msgName := protoreflect.FullName(strings.TrimPrefix(messageType, "."))
116✔
1041

116✔
1042
        desc, err := protoregistry.GlobalFiles.FindDescriptorByName(msgName)
116✔
1043
        if err != nil {
116✔
1044
                return nil, status.Errorf(codes.Internal, "Message descriptor not found: %v", err)
×
1045
        }
×
1046

1047
        msgDesc, ok := desc.(protoreflect.MessageDescriptor)
116✔
1048
        if !ok {
116✔
1049
                return nil, status.Errorf(codes.Internal, "Not a message descriptor: %s", msgName)
×
1050
        }
×
1051

1052
        return msgDesc, nil
116✔
1053
}
1054

1055
func LogUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
69✔
1056
        start := time.Now()
69✔
1057
        resp, err := handler(ctx, req)
69✔
1058

69✔
1059
        grpcPeer, _ := peer.FromContext(ctx)
69✔
1060
        service, method := splitMethodName(info.FullMethod)
69✔
1061

69✔
1062
        level := zerolog.InfoLevel
69✔
1063
        if service == serviceReflection {
69✔
1064
                level = zerolog.DebugLevel
×
1065
        }
×
1066

1067
        event := zerolog.Ctx(ctx).WithLevel(level).
69✔
1068
                Str("grpc.component", "server").
69✔
1069
                Str("grpc.method", method).
69✔
1070
                Str("grpc.method_type", "unary").
69✔
1071
                Str("grpc.service", service).
69✔
1072
                Str("grpc.code", status.Code(err).String()).
69✔
1073
                Dur("grpc.time_ms", time.Since(start)).
69✔
1074
                Str("peer.address", getPeerAddress(grpcPeer)).
69✔
1075
                Str("protocol", "grpc")
69✔
1076

69✔
1077
        if md, ok := metadata.FromIncomingContext(ctx); ok {
138✔
1078
                event.Interface("grpc.metadata", md)
69✔
1079
        }
69✔
1080

1081
        if content := protoToJSON(req); content != nil {
138✔
1082
                event.RawJSON("grpc.request.content", content)
69✔
1083
        }
69✔
1084

1085
        if content := protoToJSON(resp); content != nil {
128✔
1086
                event.RawJSON("grpc.response.content", content)
59✔
1087
        }
59✔
1088

1089
        event.Msg("gRPC call completed")
69✔
1090

69✔
1091
        return resp, err
69✔
1092
}
1093

1094
func LogStreamInterceptor(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
118✔
1095
        start := time.Now()
118✔
1096
        grpcPeer, _ := peer.FromContext(stream.Context())
118✔
1097
        service, method := splitMethodName(info.FullMethod)
118✔
1098

118✔
1099
        wrapped := &loggingStream{stream, []any{}, []any{}}
118✔
1100
        err := handler(srv, wrapped)
118✔
1101

118✔
1102
        level := zerolog.InfoLevel
118✔
1103
        if service == serviceReflection {
211✔
1104
                level = zerolog.DebugLevel
93✔
1105
        }
93✔
1106

1107
        zerolog.Ctx(stream.Context()).WithLevel(level).
118✔
1108
                Str("grpc.component", "server").
118✔
1109
                Str("grpc.method", method).
118✔
1110
                Str("grpc.method_type", "stream").
118✔
1111
                Str("grpc.service", service).
118✔
1112
                Str("grpc.code", status.Code(err).String()).
118✔
1113
                Dur("grpc.time_ms", time.Since(start)).
118✔
1114
                Str("peer.address", getPeerAddress(grpcPeer)).
118✔
1115
                Array("grpc.request.content", toLogArray(wrapped.requests...)).
118✔
1116
                Array("grpc.response.content", toLogArray(wrapped.responses...)).
118✔
1117
                Str("protocol", "grpc").
118✔
1118
                Msg("gRPC call completed")
118✔
1119

118✔
1120
        return err
118✔
1121
}
1122

1123
func splitMethodName(fullMethod string) (string, string) {
187✔
1124
        const (
187✔
1125
                slash   = "/"
187✔
1126
                unknown = "unknown"
187✔
1127
        )
187✔
1128

187✔
1129
        parts := strings.Split(fullMethod, slash)
187✔
1130
        if len(parts) != 3 { //nolint:mnd
187✔
1131
                return unknown, unknown
×
1132
        }
×
1133

1134
        return parts[1], parts[2]
187✔
1135
}
1136

1137
func getPeerAddress(p *peer.Peer) string {
187✔
1138
        if p != nil && p.Addr != nil {
374✔
1139
                return p.Addr.String()
187✔
1140
        }
187✔
1141

1142
        return "unknown"
×
1143
}
1144

1145
func protoToJSON(msg any) []byte {
551✔
1146
        if msg == nil || isNilInterface(msg) {
561✔
1147
                return nil
10✔
1148
        }
10✔
1149

1150
        message, ok := msg.(proto.Message)
541✔
1151
        if !ok || message == nil {
541✔
1152
                return nil
×
1153
        }
×
1154

1155
        // Use more robust marshalling options for better JSON output
1156
        marshaller := protojson.MarshalOptions{
541✔
1157
                EmitUnpopulated: false,
541✔
1158
                UseProtoNames:   true,
541✔
1159
                Indent:          "",
541✔
1160
        }
541✔
1161

541✔
1162
        data, err := marshaller.Marshal(message)
541✔
1163
        if err != nil {
541✔
1164
                return nil
×
1165
        }
×
1166

1167
        return data
541✔
1168
}
1169

1170
func isNilInterface(v any) bool {
1,377✔
1171
        if v == nil {
1,377✔
1172
                return true
×
1173
        }
×
1174

1175
        rv := reflect.ValueOf(v)
1,377✔
1176
        //nolint:exhaustive
1,377✔
1177
        switch rv.Kind() {
1,377✔
1178
        case reflect.Ptr, reflect.Interface, reflect.Slice, reflect.Map, reflect.Chan, reflect.Func:
1,377✔
1179
                return rv.IsNil()
1,377✔
1180
        default:
×
1181
                return false
×
1182
        }
1183
}
1184

1185
func toLogArray(items ...any) *zerolog.Array {
236✔
1186
        arr := zerolog.Arr()
236✔
1187

236✔
1188
        for _, item := range items {
649✔
1189
                // Skip nil items (they shouldn't be in the array anymore, but just in case)
413✔
1190
                if item == nil || isNilInterface(item) {
413✔
1191
                        continue
×
1192
                }
1193

1194
                if value := protoToJSON(item); value != nil {
826✔
1195
                        arr = arr.RawJSON(value)
413✔
1196
                } else {
413✔
1197
                        // Fallback to string representation for non-proto messages
×
1198
                        arr = arr.Str(fmt.Sprintf("%v", item))
×
1199
                }
×
1200
        }
1201

1202
        return arr
236✔
1203
}
1204

1205
type loggingStream struct {
1206
        grpc.ServerStream
1207

1208
        requests  []any
1209
        responses []any
1210
}
1211

1212
func (s *loggingStream) SendMsg(m any) error {
141✔
1213
        // Only log non-nil messages
141✔
1214
        if m != nil && !isNilInterface(m) {
282✔
1215
                s.responses = append(s.responses, m)
141✔
1216
        }
141✔
1217

1218
        return s.ServerStream.SendMsg(m)
141✔
1219
}
1220

1221
func (s *loggingStream) RecvMsg(m any) error {
274✔
1222
        // Only log non-nil messages
274✔
1223
        if m != nil && !isNilInterface(m) {
546✔
1224
                s.requests = append(s.requests, m)
272✔
1225
        }
272✔
1226

1227
        return s.ServerStream.RecvMsg(m)
274✔
1228
}
1229

1230
// sendBidiResponses sends response(s) for bidirectional streaming.
1231
//
1232

1233
func (m *grpcMocker) sendBidiResponses(stream grpc.ServerStream, output stuber.Output, stub *stuber.Stub, messageIndex int) error {
15✔
1234
        // For bidirectional streaming, send all elements from Stream if available.
15✔
1235
        if len(output.Stream) > 0 {
27✔
1236
                return m.sendStreamResponses(stream, output, stub, messageIndex)
12✔
1237
        }
12✔
1238

1239
        // Fallback to Data if no Stream available.
1240
        outputMsg, err := m.newOutputMessage(output.Data)
3✔
1241
        if err != nil {
3✔
NEW
1242
                return errors.Wrap(err, "failed to convert response to dynamic message")
×
NEW
1243
        }
×
1244

1245
        return sendStreamMessage(stream, outputMsg)
3✔
1246
}
1247

1248
// sendStreamResponses sends responses from output stream.
1249
//
1250
//nolint:cyclop,nestif
1251
func (m *grpcMocker) sendStreamResponses(stream grpc.ServerStream, output stuber.Output, stub *stuber.Stub, messageIndex int) error {
12✔
1252
        // For stubs with Inputs (multiple input messages), send one response per input message
12✔
1253
        if stub.IsClientStream() {
23✔
1254
                // If only one element is provided in the stream, treat it as a template to be used for every message
11✔
1255
                // The MessageIndex is already applied in handleBidiStream during template processing
11✔
1256
                var idx int
11✔
1257

11✔
1258
                if len(output.Stream) == 0 {
11✔
NEW
1259
                        return nil
×
NEW
1260
                }
×
1261

1262
                if len(output.Stream) == 1 {
11✔
NEW
1263
                        idx = 0
×
1264
                } else {
11✔
1265
                        if messageIndex < 0 || messageIndex >= len(output.Stream) {
11✔
NEW
1266
                                return nil
×
NEW
1267
                        }
×
1268

1269
                        idx = messageIndex
11✔
1270
                }
1271

1272
                streamData, ok := output.Stream[idx].(map[string]any)
11✔
1273
                if !ok {
11✔
NEW
1274
                        return nil
×
NEW
1275
                }
×
1276

1277
                outputMsg, err := m.newOutputMessage(streamData)
11✔
1278
                if err != nil {
11✔
NEW
1279
                        return errors.Wrap(err, "failed to convert response to dynamic message")
×
NEW
1280
                }
×
1281

1282
                return sendStreamMessage(stream, outputMsg)
11✔
1283
        }
1284

1285
        // For stubs with Input (single input message), send all elements from the stream array
1286
        for _, streamElement := range output.Stream {
4✔
1287
                if streamData, ok := streamElement.(map[string]any); ok {
6✔
1288
                        outputMsg, err := m.newOutputMessage(streamData)
3✔
1289
                        if err != nil {
3✔
NEW
1290
                                return errors.Wrap(err, "failed to convert response to dynamic message")
×
NEW
1291
                        }
×
1292

1293
                        if err := sendStreamMessage(stream, outputMsg); err != nil {
3✔
NEW
1294
                                return err //nolint:wrapcheck
×
NEW
1295
                        }
×
1296
                }
1297
        }
1298

1299
        return nil
1✔
1300
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc