• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nogipx / rpc_dart / 23169302222

16 Mar 2026 10:40PM UTC coverage: 76.365% (+0.2%) from 76.156%
23169302222

push

github

nogipx
update deps

3887 of 5090 relevant lines covered (76.37%)

8.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.75
/packages/rpc_dart/lib/src/rpc/streams/base_processor.dart
1
// SPDX-FileCopyrightText: 2025 Karim "nogipx" Mamatkazin <nogipx@gmail.com>
2
// SPDX-FileCopyrightText: 2026 Karim "nogipx" Mamatkazin <nogipx@gmail.com>
3
//
4
// SPDX-License-Identifier: MIT
5

6
part of '_index.dart';
7

8
/// Shared stream processor: zero-copy when no codecs (zero-copy transport required), otherwise serialized.
9
final class StreamProcessor<TRequest extends Object, TResponse extends Object> {
10
  final RpcLogger? _logger;
11
  final IRpcTransport _transport;
12
  final int _streamId;
13
  final String _serviceName;
14
  final String _methodName;
15
  final IRpcCodec<TRequest>? _requestCodec;
16
  final IRpcCodec<TResponse>? _responseCodec;
17

18
  /// RPC context with cancellation/metadata.
19
  final RpcContext? _context;
20

21
  /// Cancellation subscription.
22
  StreamSubscription? _cancellationSubscription;
23

24
  /// Parser for fragmented messages (serialization mode only).
25
  RpcMessageParser? _parser;
26

27
  /// Whether zero-copy mode is active.
28
  final bool _isZeroCopy;
29

30
  /// Incoming requests controller.
31
  final StreamController<TRequest> _requestController =
32
      StreamController<TRequest>();
33

34
  /// Outgoing responses controller.
35
  final StreamController<TResponse> _responseController =
36
      StreamController<TResponse>(sync: true);
37

38
  /// Subscription to outgoing responses.
39
  StreamSubscription<TResponse>? _responseSubscription;
40

41
  /// Subscription to incoming transport messages.
42
  StreamSubscription? _messageSubscription;
43

44
  /// Send sequence to preserve order and await completion before trailers.
45
  Future<void> _sendSequence = Future<void>.value();
46

47
  bool _trailerSent = false;
48

49
  /// Processor active flag.
50
  bool _isActive = true;
51

52
  /// Indicates initial metadata sent.
53
  bool _initialMetadataSent = false;
54

55
  /// Response encoding selected from client's grpc-accept-encoding.
56
  /// Null means identity (no compression).
57
  /// Initially set from server context; overridden by incoming client metadata.
58
  String? _responseEncoding;
59

60
  /// Request encoding advertised by the peer in grpc-encoding.
61
  /// Set when the initial request metadata arrives; used by the decompressor.
62
  String? _requestEncoding;
63

64
  /// Method path `/Service/Method`.
65
  late final String _methodPath;
66

67
  StreamProcessor({
16✔
68
    required IRpcTransport transport,
69
    required int streamId,
70
    required String serviceName,
71
    required String methodName,
72
    IRpcCodec<TRequest>? requestCodec,
73
    IRpcCodec<TResponse>? responseCodec,
74
    RpcContext? context,
75
    RpcLogger? logger,
76
  })  : _transport = transport,
77
        _streamId = streamId,
78
        _serviceName = serviceName,
79
        _methodName = methodName,
80
        _isZeroCopy = requestCodec == null && responseCodec == null,
81
        _requestCodec = requestCodec,
82
        _responseCodec = responseCodec,
83
        _context = context,
84
        _logger = logger?.child('StreamProcessor') {
9✔
85
    // Serialization requires codecs.
86
    if (!_isZeroCopy) {
16✔
87
      if (_requestCodec == null || _responseCodec == null) {
28✔
88
        throw ArgumentError(
1✔
89
          'Codecs are required for serialization mode. '
90
          'For zero-copy leave codecs null.',
91
        );
92
      }
93
      _parser = RpcMessageParser(
28✔
94
        logger: _logger,
14✔
95
        decompressor: (payload) {
1✔
96
          final encoding =
97
              _requestEncoding ?? _context?.getHeader(RpcHeaders.grpcEncoding);
1✔
98
          if (encoding == null || encoding == RpcGrpcCompression.identity) {
1✔
99
            throw RpcException(
×
100
              'Compressed gRPC payload received without grpc-encoding',
101
            );
102
          }
103
          return RpcGrpcCompression.decompress(payload, encoding: encoding);
1✔
104
        },
105
      );
106
    } else {
107
      // Zero-copy requires transport support.
108
      if (!transport.supportsZeroCopy) {
3✔
109
        throw ArgumentError(
1✔
110
          'Zero-copy mode requires a transport with zero-copy support. '
111
          'Provide codecs for network transports.',
112
        );
113
      }
114
    }
115

116
    _methodPath = '/$_serviceName/$_methodName';
64✔
117
    _responseEncoding = _pickResponseEncoding(context);
32✔
118

119
    _logger?.internal(
25✔
120
      'Created ${_isZeroCopy ? "Zero-copy" : "Serialized"} StreamProcessor for $_methodPath [streamId: $_streamId]${_context?.cancellationToken != null ? " with cancellation token" : ""}',
53✔
121
    );
122

123
    _setupCancellationMonitoring();
16✔
124
    _setupResponseHandler();
16✔
125
  }
126

127
  /// Picks the best response encoding from the client's grpc-accept-encoding.
128
  static String? _pickResponseEncoding(RpcContext? context) {
16✔
129
    final accept = context?.getHeader(RpcHeaders.grpcAcceptEncoding);
10✔
130
    return RpcGrpcCompression.selectResponseEncoding(accept);
16✔
131
  }
132

133
  /// Incoming request stream.
134
  Stream<TRequest> get requests => _requestController.stream;
48✔
135

136
  /// Whether processor is active.
137
  bool get isActive => _isActive;
2✔
138

139
  /// Zero-copy mode flag.
140
  bool get isZeroCopy => _isZeroCopy;
2✔
141

142
  /// Configures outgoing response handling.
143
  void _setupResponseHandler() {
16✔
144
    _responseSubscription = _responseController.stream.listen(
64✔
145
      (response) {
16✔
146
        _sendSequence = _sendSequence.then((_) async {
64✔
147
          if (!_isActive) return;
16✔
148

149
          _logger?.internal(
25✔
150
            'Sending response for $_methodPath [streamId: $_streamId]',
27✔
151
          );
152
          try {
153
            if (_isZeroCopy) {
16✔
154
              // Zero-copy path
155
              _logger?.internal(
4✔
156
                'Zero-copy send [streamId: $_streamId]',
4✔
157
              );
158
              await _transport.sendDirectObject(_streamId, response);
6✔
159
              _logger?.internal(
4✔
160
                'Zero-copy response sent for $_methodPath [streamId: $_streamId]',
6✔
161
              );
162
            } else {
163
              // Send initial metadata before the first response frame only when
164
              // we need to advertise compression. Without compression the
165
              // existing behaviour (no initial metadata for streaming) is kept
166
              // so existing tests and in-memory transports are not affected.
167
              if (_responseEncoding != null && !_initialMetadataSent) {
26✔
168
                await _transport.sendMetadata(
24✔
169
                  _streamId,
12✔
170
                  RpcMetadata.forServerInitialResponse(
12✔
171
                    encoding: _responseEncoding,
12✔
172
                  ),
173
                );
174
                _initialMetadataSent = true;
12✔
175
              }
176

177
              // Serialization for network transports
178
              final serialized = _responseCodec!.serialize(response);
28✔
179
              _logger?.internal(
21✔
180
                'Response serialized (${serialized.length} bytes) [streamId: $_streamId]',
21✔
181
              );
182

183
              final useCompression = _responseEncoding != null;
14✔
184
              final payload = useCompression
185
                  ? RpcGrpcCompression.compress(
12✔
186
                      serialized,
187
                      encoding: _responseEncoding!,
12✔
188
                    )
189
                  : serialized;
190
              final framedMessage = RpcMessageFrame.encode(
14✔
191
                payload,
192
                compressed: useCompression,
193
              );
194
              await _transport.sendMessage(_streamId, framedMessage);
42✔
195

196
              _logger?.internal(
19✔
197
                'Response sent for $_methodPath [streamId: $_streamId]',
18✔
198
              );
199
            }
200
          } catch (e, stackTrace) {
201
            // Skip if transport closed
202
            if (e.toString().contains('Transport is closed') ||
2✔
203
                e.toString().contains('closed')) {
2✔
204
              _logger?.internal(
2✔
205
                'Transport closed, skipping response send [streamId: $_streamId]',
2✔
206
              );
207
              return;
208
            }
209
            _logger?.error(
1✔
210
              'Failed to send response [streamId: $_streamId]',
×
211
              error: e,
212
              stackTrace: stackTrace,
213
            );
214
          }
215
        });
216
      },
217
      onError: (error, stackTrace) {
1✔
218
        _logger?.error(
1✔
219
          'Error in response stream for $_methodPath [streamId: $_streamId]',
×
220
          error: error,
221
          stackTrace: stackTrace,
222
        );
223
      },
224
    );
225
  }
226

227
  Future<void> _sendOkTrailerIfNeeded() async {
15✔
228
    if (_trailerSent) return;
15✔
229
    _trailerSent = true;
15✔
230

231
    try {
232
      final trailers = RpcMetadata.forTrailer(RpcStatus.ok);
15✔
233
      await _transport.sendMetadata(_streamId, trailers, endStream: true);
45✔
234
      _logger?.internal(
22✔
235
        'Trailer sent for $_methodPath [streamId: $_streamId]',
24✔
236
      );
237
    } catch (e, stackTrace) {
238
      if (e.toString().contains('Transport is closed') ||
2✔
239
          e.toString().contains('closed')) {
2✔
240
        _logger?.internal(
2✔
241
          'Transport closed, skipping trailer send [streamId: $_streamId]',
2✔
242
        );
243
        return;
244
      }
245
      _logger?.error(
1✔
246
        'Failed to send trailer [streamId: $_streamId]',
×
247
        error: e,
248
        stackTrace: stackTrace,
249
      );
250
    }
251
  }
252

253
  /// Binds the processor to an endpoint message stream.
254
  void bindToMessageStream(Stream<RpcTransportMessage> messageStream) {
16✔
255
    if (_messageSubscription != null) {
16✔
256
      _logger?.logRpcWarning(
2✔
257
        message: 'Stream processor already bound to message stream',
258
        methodPath: _methodPath,
×
259
        streamId: _streamId,
×
260
      );
261
      return;
262
    }
263

264
    _logger?.logStreamBound(methodPath: _methodPath, streamId: _streamId);
40✔
265

266
    _messageSubscription = messageStream.listen(
32✔
267
      _handleMessage,
16✔
268
      onError: (error, stackTrace) {
1✔
269
        _logger?.logRpcError(
1✔
270
          operation: 'message_stream_listen',
271
          error: error,
272
          stackTrace: stackTrace,
273
          methodPath: _methodPath,
×
274
          streamId: _streamId,
×
275
        );
276
        if (!_requestController.isClosed) {
2✔
277
          _requestController.addError(error, stackTrace);
2✔
278
        }
279
      },
280
      onDone: () {
2✔
281
        _logger?.logStreamFinished(
2✔
282
          methodPath: _methodPath,
×
283
          streamId: _streamId,
×
284
          reason: 'message_stream_completed',
285
        );
286
        if (!_requestController.isClosed) {
4✔
287
          _requestController.close();
4✔
288
        }
289
      },
290
    );
291

292
    // Initial metadata is not sent on bind; it is sent with the first response
293
    // or skipped when sending an immediate error.
294
  }
295

296
  /// Checks cancellation token and throws if cancelled.
297
  void _checkCancellation() {
16✔
298
    _context?.cancellationToken?.throwIfCancelled();
27✔
299
  }
300

301
  /// Handles an incoming transport message.
302
  void _handleMessage(RpcTransportMessage message) {
16✔
303
    if (!_isActive) return;
16✔
304

305
    // Check cancellation before handling each message.
306
    try {
307
      _checkCancellation();
16✔
308
    } catch (e) {
309
      _logger?.internal(
×
310
        'Message skipped due to cancellation [streamId: $_streamId]',
×
311
      );
312
      return;
313
    }
314

315
    _logger?.logMessageReceived(
24✔
316
      streamId: message.streamId,
8✔
317
      messageType: message.isMetadataOnly
8✔
318
          ? 'metadata'
319
          : message.isDirect
8✔
320
              ? 'zero_copy'
321
              : 'serialized',
322
      payloadSize: message.payload?.length,
14✔
323
      isDirectPayload: message.isDirect,
8✔
324
    );
325

326
    // Extract encoding hints from initial request metadata.
327
    if (message.isMetadataOnly && message.metadata != null) {
31✔
328
      final meta = message.metadata!;
15✔
329

330
      // grpc-encoding: what the peer used to compress its requests.
331
      final reqEnc = meta.getHeaderValue(RpcHeaders.grpcEncoding);
15✔
332
      if (reqEnc != null && reqEnc != RpcGrpcCompression.identity) {
1✔
333
        _requestEncoding = reqEnc;
1✔
334
      }
335

336
      // grpc-accept-encoding: what the peer can decompress → use for responses.
337
      if (_responseEncoding == null) {
15✔
338
        final accept = meta.getHeaderValue(
7✔
339
          RpcHeaders.grpcAcceptEncoding,
340
        );
341
        if (accept != null) {
342
          for (final enc in accept.split(',').map((e) => e.trim())) {
30✔
343
            if (enc != RpcGrpcCompression.identity &&
6✔
344
                RpcGrpcCompression.isSupported(enc)) {
6✔
345
              _responseEncoding = enc;
6✔
346
              break;
347
            }
348
          }
349
        }
350
      }
351
    }
352

353
    // Zero-copy: direct object.
354
    if (message.isDirect && message.directPayload != null) {
19✔
355
      _processDirectMessage(message.directPayload!);
6✔
356
    }
357
    // Serialized payload.
358
    else if (!message.isMetadataOnly && message.payload != null) {
30✔
359
      _processDataMessage(message.payload!);
28✔
360
    }
361

362
    // End of stream.
363
    if (message.isEndOfStream) {
16✔
364
      _logger?.logStreamFinished(
23✔
365
        methodPath: _methodPath,
8✔
366
        streamId: _streamId,
8✔
367
        reason: 'end_of_stream_received',
368
      );
369
      if (!_requestController.isClosed) {
30✔
370
        _requestController.close();
30✔
371
      }
372
    }
373
  }
374

375
  /// Zero-copy: processes a direct object without serialization.
376
  void _processDirectMessage(Object directPayload) {
3✔
377
    try {
378
      final request = directPayload as TRequest;
379

380
      if (!_requestController.isClosed) {
6✔
381
        _requestController.add(request);
6✔
382
      } else {
383
        _logger?.logRpcWarning(
×
384
          message: 'Cannot add request to closed controller (zero-copy)',
385
          methodPath: _methodPath,
×
386
          streamId: _streamId,
×
387
          metadata: {'transport_type': 'zero_copy'},
×
388
        );
389
      }
390
    } catch (e, stackTrace) {
391
      _logger?.logRpcError(
1✔
392
        operation: 'zero_copy_direct_object_processing',
393
        error: e,
394
        stackTrace: stackTrace,
395
        methodPath: _methodPath,
×
396
        streamId: _streamId,
×
397
        metadata: {'object_type': directPayload.runtimeType.toString()},
×
398
      );
399
      if (!_requestController.isClosed) {
2✔
400
        _requestController.addError(e, stackTrace);
2✔
401
      }
402
    }
403
  }
404

405
  /// Processes a serialized message (serialization mode only).
406
  void _processDataMessage(List<int> messageBytes) {
14✔
407
    if (_isZeroCopy) {
14✔
408
      _logger?.logRpcWarning(
1✔
409
        message: 'Serialized message received in zero-copy mode, ignoring',
410
        methodPath: _methodPath,
×
411
        streamId: _streamId,
×
412
      );
413
      return;
414
    }
415

416
    _logger?.logMessageReceived(
20✔
417
      streamId: _streamId,
6✔
418
      messageType: 'serialized_data',
419
      payloadSize: messageBytes.length,
6✔
420
    );
421

422
    try {
423
      // Convert to Uint8List for parser.
424
      final uint8Message = messageBytes is Uint8List
14✔
425
          ? messageBytes
426
          : Uint8List.fromList(messageBytes);
×
427

428
      final messages = _parser!(uint8Message);
28✔
429

430
      for (var msgBytes in messages) {
28✔
431
        try {
432
          final request = _requestCodec!.deserialize(msgBytes);
28✔
433

434
          if (!_requestController.isClosed) {
26✔
435
            _requestController.add(request);
26✔
436
          } else {
437
            _logger?.logRpcWarning(
×
438
              message: 'Cannot add request to closed controller',
439
              methodPath: _methodPath,
×
440
              streamId: _streamId,
×
441
              metadata: {'message_size': msgBytes.length},
×
442
            );
443
          }
444
        } catch (e, stackTrace) {
445
          _logger?.logRpcError(
1✔
446
            operation: 'request_deserialization',
447
            error: e,
448
            stackTrace: stackTrace,
449
            methodPath: _methodPath,
×
450
            streamId: _streamId,
×
451
            metadata: {'message_size': msgBytes.length},
×
452
          );
453
          if (!_requestController.isClosed) {
2✔
454
            _requestController.addError(e, stackTrace);
2✔
455
          }
456
        }
457
      }
458
    } catch (e, stackTrace) {
459
      _logger?.logRpcError(
1✔
460
        operation: 'message_parsing',
461
        error: e,
462
        stackTrace: stackTrace,
463
        methodPath: _methodPath,
×
464
        streamId: _streamId,
×
465
        metadata: {'message_size': messageBytes.length},
×
466
      );
467
      if (!_requestController.isClosed) {
2✔
468
        _requestController.addError(e, stackTrace);
2✔
469
      }
470
    }
471
  }
472

473
  /// Sends a response to the client.
474
  Future<void> send(TResponse response) async {
16✔
475
    if (!_isActive) {
16✔
476
      _logger?.warning('Attempted to send response on inactive processor');
1✔
477
      return;
478
    }
479

480
    // Check cancellation before sending a response.
481
    try {
482
      _checkCancellation();
16✔
483
    } catch (e) {
484
      _logger?.internal(
1✔
485
        'Response skipped due to cancellation [streamId: $_streamId]',
×
486
      );
487
      return;
488
    }
489

490
    if (!_responseController.isClosed) {
32✔
491
      _responseController.add(response);
32✔
492
    } else {
493
      _logger?.warning('Attempted to send response to closed controller');
×
494
    }
495
  }
496

497
  /// Sends an error to the client.
498
  Future<void> sendError(int statusCode, String message) async {
5✔
499
    if (!_isActive) {
5✔
500
      _logger?.warning('Attempted to send error on inactive processor');
1✔
501
      return;
502
    }
503

504
    _logger?.error(
6✔
505
      'Sending error to client: $statusCode - $message [streamId: $_streamId]',
2✔
506
    );
507

508
    // Wait for pending sends to avoid interleaving the error trailer.
509
    await _sendSequence;
5✔
510

511
    if (!_responseController.isClosed) {
10✔
512
      await _responseController.close();
10✔
513
    }
514

515
    try {
516
      // If initial metadata was not sent, send an error response immediately.
517
      if (!_initialMetadataSent) {
5✔
518
        _logger?.internal(
6✔
519
          'Sending error without initial metadata [streamId: $_streamId]',
2✔
520
        );
521
        // Create combined metadata: initial response + error trailer.
522
        final errorHeaders = [
5✔
523
          RpcHeader(':status', '200'), // HTTP 200 for gRPC
5✔
524
          RpcHeader(
5✔
525
            RpcHeaders.contentType,
526
            RpcHeaders.contentTypeGrpc,
527
          ),
528
          RpcHeader(RpcHeaders.grpcStatus, statusCode.toString()),
10✔
529
        ];
530

531
        if (message.isNotEmpty) {
5✔
532
          errorHeaders.add(
5✔
533
            RpcHeader(
5✔
534
              RpcHeaders.grpcMessage,
535
              RpcMetadata.encodeGrpcMessage(message),
5✔
536
            ),
537
          );
538
        }
539

540
        final errorMetadata = RpcMetadata(errorHeaders);
5✔
541
        await _transport.sendMetadata(
10✔
542
          _streamId,
5✔
543
          errorMetadata,
544
          endStream: true,
545
        );
546
        _initialMetadataSent = true;
5✔
547
      } else {
548
        // Initial metadata already sent; send trailer only.
549
        final trailers = RpcMetadata.forTrailer(statusCode, message: message);
1✔
550
        await _transport.sendMetadata(_streamId, trailers, endStream: true);
3✔
551
      }
552

553
      _logger?.internal('Error sent to client [streamId: $_streamId]');
5✔
554
      _trailerSent = true;
5✔
555
    } catch (e, stackTrace) {
556
      // Skip if transport is closed.
557
      if (e.toString().contains('Transport is closed') ||
2✔
558
          e.toString().contains('closed')) {
×
559
        _logger?.internal(
2✔
560
          'Transport closed, skipping error send [streamId: $_streamId]',
2✔
561
        );
562
        return;
563
      }
564
      _logger?.error(
×
565
        'Failed to send error to client [streamId: $_streamId]',
×
566
        error: e,
567
        stackTrace: stackTrace,
568
      );
569
    }
570
  }
571

572
  /// Finishes sending responses.
573
  Future<void> finishSending() async {
15✔
574
    if (!_isActive) return;
15✔
575

576
    _logger?.internal(
24✔
577
      'Finishing response send for $_methodPath [streamId: $_streamId]',
27✔
578
    );
579

580
    await _sendSequence;
15✔
581

582
    if (!_responseController.isClosed) {
30✔
583
      await _responseController.close();
30✔
584
    }
585

586
    await _sendOkTrailerIfNeeded();
15✔
587
  }
588

589
  /// Closes the processor and frees resources.
590
  Future<void> close() async {
16✔
591
    if (!_isActive) return;
16✔
592

593
    _logger?.internal(
25✔
594
      'Closing StreamProcessor for $_methodPath [streamId: $_streamId]',
27✔
595
    );
596
    _isActive = false;
16✔
597

598
    // Cancel all subscriptions.
599
    unawaited(_messageSubscription?.cancel());
48✔
600
    _messageSubscription = null;
16✔
601

602
    unawaited(_cancellationSubscription?.cancel());
32✔
603
    _cancellationSubscription = null;
16✔
604

605
    unawaited(_responseSubscription?.cancel());
48✔
606
    _responseSubscription = null;
16✔
607

608
    if (!_requestController.isClosed) {
32✔
609
      _requestController.close();
12✔
610
    }
611

612
    if (!_responseController.isClosed) {
32✔
613
      _responseController.close();
14✔
614
    }
615
  }
616

617
  /// Sets up cancellation monitoring.
618
  void _setupCancellationMonitoring() {
16✔
619
    if (_context?.cancellationToken != null) {
26✔
620
      _cancellationSubscription =
1✔
621
          _context!.cancellationToken!.cancelled.asStream().listen(
5✔
622
        (_) {
1✔
623
          _logger?.internal(
1✔
624
            'Operation cancelled, shutting down processor [streamId: $_streamId]',
×
625
          );
626
          _isActive = false;
1✔
627

628
          final reason =
629
              _context!.cancellationToken!.reason ?? 'Operation was cancelled';
3✔
630
          final cancelledException = RpcCancelledException(reason);
1✔
631

632
          if (!_requestController.isClosed) {
2✔
633
            _requestController.addError(cancelledException);
2✔
634
          }
635
          if (!_responseController.isClosed) {
2✔
636
            _responseController.addError(cancelledException);
2✔
637
          }
638

639
          // Cancel subscriptions.
640
          _messageSubscription?.cancel();
2✔
641
          _cancellationSubscription?.cancel();
2✔
642
        },
643
        onError: (error, stackTrace) {
×
644
          _logger?.error(
×
645
            'Error monitoring cancellation [streamId: $_streamId]',
×
646
            error: error,
647
            stackTrace: stackTrace,
648
          );
649
        },
650
      );
651
    }
652
  }
653
}
654

655
/// Shared processor for client RPC stream calls.
656
///
657
/// Automatically selects mode:
658
/// - Zero-copy for in-memory transport (codecs not needed)
659
/// - Serialization for network transports (codecs required)
660
///
661
/// Benefits:
662
/// - Reuse across stream call types
663
/// - Avoids race conditions
664
/// - Clear separation of concerns
665
/// - Testable without out-of-process dependencies
666
/// - Works with any object types, not just IRpcSerializable
667
/// - Auto-optimized for in-memory transport
668
final class CallProcessor<TRequest extends Object, TResponse extends Object> {
669
  final RpcLogger? _logger;
670
  final IRpcTransport _transport;
671
  final int _streamId;
672
  final String _serviceName;
673
  final String _methodName;
674
  final IRpcCodec<TRequest>? _requestCodec;
675
  final IRpcCodec<TResponse>? _responseCodec;
676

677
  /// RPC context for metadata, timeouts, and cancellation.
678
  final RpcContext? _context;
679

680
  /// Cancellation subscription.
681
  StreamSubscription? _cancellationSubscription;
682

683
  /// Parser for fragmented messages (serialization mode only).
684
  RpcMessageParser? _parser;
685

686
  String? _peerGrpcEncoding;
687

688
  /// Processor mode flag.
689
  final bool _isZeroCopy;
690

691
  /// Outgoing request controller.
692
  final StreamController<TRequest> _requestController =
693
      StreamController<TRequest>();
694

695
  /// Incoming response controller.
696
  final StreamController<RpcMessage<TResponse>> _responseController =
697
      StreamController<RpcMessage<TResponse>>();
698

699
  /// Outgoing request subscription.
700
  StreamSubscription? _requestSubscription;
701

702
  /// Incoming response subscription.
703
  StreamSubscription? _responseSubscription;
704

705
  /// Processor active flag.
706
  bool _isActive = true;
707

708
  /// Whether initial metadata was sent.
709
  bool _initialMetadataSent = false;
710

711
  /// Method path in /Service/Method format.
712
  late final String _methodPath;
713

714
  CallProcessor({
16✔
715
    required IRpcTransport transport,
716
    required String serviceName,
717
    required String methodName,
718
    IRpcCodec<TRequest>? requestCodec,
719
    IRpcCodec<TResponse>? responseCodec,
720
    RpcContext? context,
721
    RpcLogger? logger,
722
  })  : _transport = transport,
723
        _streamId = transport.createStream(),
16✔
724
        _serviceName = serviceName,
725
        _methodName = methodName,
726
        _isZeroCopy = requestCodec == null && responseCodec == null,
727
        _requestCodec = requestCodec,
728
        _responseCodec = responseCodec,
729
        _context = context,
730
        _logger = logger?.child('CallProcessor') {
8✔
731
    // Validation: codecs are required for serialization mode.
732
    if (!_isZeroCopy) {
16✔
733
      if (_requestCodec == null || _responseCodec == null) {
28✔
734
        throw ArgumentError(
×
735
          'Codecs are required for serialization mode. '
736
          'For zero-copy leave codecs null.',
737
        );
738
      }
739
      _parser = RpcMessageParser(
28✔
740
        logger: _logger,
14✔
741
        decompressor: (payload) {
11✔
742
          final encoding = _peerGrpcEncoding;
11✔
743
          if (encoding == null || encoding == RpcGrpcCompression.identity) {
11✔
744
            throw RpcException(
×
745
              'Compressed gRPC payload received without grpc-encoding',
746
            );
747
          }
748
          return RpcGrpcCompression.decompress(payload, encoding: encoding);
11✔
749
        },
750
      );
751
    } else {
752
      // Zero-copy mode requires transport support.
753
      if (!transport.supportsZeroCopy) {
3✔
754
        throw ArgumentError(
×
755
          'Zero-copy mode requires a transport with zero-copy support. '
756
          'Provide codecs for network transports.',
757
        );
758
      }
759
    }
760

761
    _methodPath = '/$_serviceName/$_methodName';
64✔
762

763
    _logger?.internal(
24✔
764
      'Created ${_isZeroCopy ? "Zero-copy" : "Serialized"} CallProcessor for $_methodPath [streamId: $_streamId]${_context?.cancellationToken != null ? " with cancellation token" : ""}',
48✔
765
    );
766

767
    // Validate context before starting.
768
    _checkContextBeforeCall();
16✔
769

770
    _setupCancellationMonitoring();
16✔
771
    _setupRequestHandler();
16✔
772
    _setupResponseHandler();
16✔
773
  }
774

775
  /// Incoming responses from server.
776
  Stream<RpcMessage<TResponse>> get responses => _responseController.stream;
48✔
777

778
  /// Whether processor is active.
779
  bool get isActive => _isActive;
2✔
780

781
  /// Stream ID.
782
  int get streamId => _streamId;
4✔
783

784
  /// Zero-copy mode flag.
785
  bool get isZeroCopy => _isZeroCopy;
×
786

787
  /// Configures outgoing request handling.
788
  void _setupRequestHandler() {
16✔
789
    _requestSubscription = _requestController.stream.listen(
64✔
790
      (request) async {
16✔
791
        if (!_isActive) return;
16✔
792

793
        try {
794
          // Send initial metadata with the first request.
795
          if (!_initialMetadataSent) {
16✔
796
            await _sendInitialMetadata();
16✔
797
            _initialMetadataSent = true;
16✔
798
          }
799

800
          _logger?.internal(
24✔
801
            'Sending request for $_methodPath [streamId: $_streamId]',
24✔
802
          );
803

804
          if (_isZeroCopy) {
16✔
805
            // Zero-copy path.
806
            _logger?.internal(
5✔
807
              'Zero-copy request send [streamId: $_streamId]',
4✔
808
            );
809
            await _transport.sendDirectObject(_streamId, request);
9✔
810
            _logger?.internal(
5✔
811
              'Zero-copy request sent for $_methodPath [streamId: $_streamId]',
6✔
812
            );
813
          } else {
814
            // Serialization for network transports.
815
            final serialized = _requestCodec!.serialize(request);
28✔
816
            _logger?.internal(
20✔
817
              'Request serialized (${serialized.length} bytes) [streamId: $_streamId]',
18✔
818
            );
819

820
            final requestEncoding =
821
                _context?.getHeader(RpcHeaders.grpcEncoding);
22✔
822
            if (requestEncoding != null &&
823
                requestEncoding != RpcGrpcCompression.identity &&
1✔
824
                !RpcGrpcCompression.isSupported(requestEncoding)) {
1✔
825
              throw RpcException(
×
826
                'Unsupported grpc-encoding: $requestEncoding. '
827
                'Supported: ${RpcGrpcCompression.supportedEncodings().join(', ')}',
×
828
              );
829
            }
830
            final useCompression = requestEncoding != null &&
831
                requestEncoding != RpcGrpcCompression.identity;
1✔
832
            final payload = useCompression
833
                ? RpcGrpcCompression.compress(
1✔
834
                    serialized,
835
                    encoding: requestEncoding,
836
                  )
837
                : serialized;
838

839
            final framedMessage = RpcMessageFrame.encode(
14✔
840
              payload,
841
              compressed: useCompression,
842
            );
843
            await _transport.sendMessage(_streamId, framedMessage);
42✔
844

845
            _logger?.internal(
20✔
846
              'Request sent for $_methodPath [streamId: $_streamId]',
18✔
847
            );
848
          }
849
        } catch (e, stackTrace) {
850
          _logger?.error(
×
851
            'Failed to send request [streamId: $_streamId]',
×
852
            error: e,
853
            stackTrace: stackTrace,
854
          );
855
          if (!_responseController.isClosed) {
×
856
            _responseController.addError(e, stackTrace);
×
857
          }
858

859
          // Critical: on routing error stop immediately to prevent further sends.
860
          if (!_requestController.isClosed) {
×
861
            _requestController.close();
×
862
          }
863
        }
864
      },
865
      onDone: () async {
15✔
866
        if (!_isActive) return;
15✔
867

868
        try {
869
          await _transport.finishSending(_streamId);
45✔
870
          _logger?.internal(
23✔
871
            'finishSending completed for $_methodPath [streamId: $_streamId]',
24✔
872
          );
873
        } catch (e, stackTrace) {
874
          _logger?.error(
×
875
            'Failed to finish sending requests [streamId: $_streamId]',
×
876
            error: e,
877
            stackTrace: stackTrace,
878
          );
879
        }
880
      },
881
      onError: (error, stackTrace) {
×
882
        _logger?.error(
×
883
          'Error in request stream for $_methodPath [streamId: $_streamId]',
×
884
          error: error,
885
          stackTrace: stackTrace,
886
        );
887
        if (!_responseController.isClosed) {
×
888
          _responseController.addError(error, stackTrace);
×
889
        }
890
      },
891
    );
892
  }
893

894
  /// Configures incoming response handling.
895
  void _setupResponseHandler() {
16✔
896
    _responseSubscription = _transport.getMessagesForStream(_streamId).listen(
80✔
897
      _handleResponse,
16✔
898
      onError: (error, stackTrace) {
×
899
        _logger?.error(
×
900
          'Error in response stream',
901
          error: error,
902
          stackTrace: stackTrace,
903
        );
904
        if (!_responseController.isClosed) {
×
905
          _responseController.addError(error, stackTrace);
×
906
        }
907
      },
908
      onDone: () {
1✔
909
        _logger?.internal(
1✔
910
          'Response stream completed for $_methodPath [streamId: $_streamId]',
×
911
        );
912
        if (!_responseController.isClosed) {
2✔
913
          _responseController.close();
2✔
914
        }
915
      },
916
    );
917
  }
918

919
  /// Sends initial metadata with context support.
920
  Future<void> _sendInitialMetadata() async {
16✔
921
    _logger?.internal(
24✔
922
      'Sending initial metadata for $_methodPath [streamId: $_streamId]',
24✔
923
    );
924

925
    final baseMetadata =
926
        RpcMetadata.forClientRequest(_serviceName, _methodName);
48✔
927

928
    // Use a map so context headers naturally override base headers,
929
    // preventing duplicates (e.g. grpc-accept-encoding).
930
    final headerMap = <String, String>{
16✔
931
      for (final h in baseMetadata.headers) h.name: h.value,
64✔
932
    };
933

934
    if (_context != null) {
16✔
935
      // Context headers override base — handles grpc-accept-encoding dedup.
936
      headerMap.addAll(_context!.headers);
30✔
937

938
      if (_context!.traceId != null) {
20✔
939
        headerMap[RpcHeaders.xTraceId] = _context!.traceId!;
27✔
940
      }
941
      headerMap[RpcHeaders.xRequestId] = _context!.requestId;
30✔
942

943
      if (_context!.deadline != null) {
20✔
944
        final timeout = _context!.remainingTime;
2✔
945
        if (timeout != null) {
946
          headerMap[RpcHeaders.grpcTimeout] =
1✔
947
              RpcMetadata.encodeGrpcTimeout(timeout);
1✔
948
        }
949
      }
950

951
      _logger?.internal(
18✔
952
        'Context headers added: ${_context!.headers.length} custom + system [streamId: $_streamId]',
40✔
953
      );
954
    } else {
955
      headerMap[RpcHeaders.xRequestId] = RpcContext.empty().requestId;
24✔
956

957
      _logger?.internal(
8✔
958
        'Added base request-id for null context [streamId: $_streamId]',
×
959
      );
960
    }
961

962
    final metadata = RpcMetadata(
16✔
963
      [for (final e in headerMap.entries) RpcHeader(e.key, e.value)],
80✔
964
      methodPath: baseMetadata.methodPath,
16✔
965
    );
966
    await _transport.sendMetadata(_streamId, metadata);
48✔
967

968
    _logger?.internal(
24✔
969
      'Initial metadata sent for $_methodPath [streamId: $_streamId]',
24✔
970
    );
971
  }
972

973
  /// Sets up cancellation monitoring for CallProcessor.
974
  void _setupCancellationMonitoring() {
16✔
975
    if (_context?.cancellationToken != null) {
26✔
976
      _cancellationSubscription =
9✔
977
          _context!.cancellationToken!.cancelled.asStream().listen(
45✔
978
        (_) async {
1✔
979
          _logger?.internal(
1✔
980
            'Operation cancelled by client, notifying server [streamId: $_streamId]',
×
981
          );
982

983
          try {
984
            // Send a cancellation notice to the server.
985
            final reason = _context!.cancellationToken!.reason ??
3✔
986
                'Operation cancelled by client';
987
            await _sendCancellationToServer(reason);
1✔
988
          } catch (e, stackTrace) {
989
            _logger?.error(
×
990
              'Failed to send cancellation notice [streamId: $_streamId]',
×
991
              error: e,
992
              stackTrace: stackTrace,
993
            );
994
          }
995

996
          _isActive = false;
1✔
997
          final cancelledException = RpcCancelledException(
1✔
998
            _context!.cancellationToken!.reason ?? 'Operation was cancelled',
3✔
999
          );
1000

1001
          if (!_requestController.isClosed) {
2✔
1002
            _requestController.addError(cancelledException);
2✔
1003
          }
1004
          if (!_responseController.isClosed) {
2✔
1005
            _responseController.addError(cancelledException);
2✔
1006
          }
1007

1008
          // Cancel subscriptions.
1009
          await _requestSubscription?.cancel();
2✔
1010
          await _responseSubscription?.cancel();
2✔
1011
          _cancellationSubscription?.cancel();
2✔
1012
        },
1013
        onError: (error, stackTrace) {
×
1014
          _logger?.error(
×
1015
            'Error monitoring cancellation [streamId: $_streamId]',
×
1016
            error: error,
1017
            stackTrace: stackTrace,
1018
          );
1019
        },
1020
      );
1021
    }
1022
  }
1023

1024
  /// Sends a cancellation notice to the server.
1025
  Future<void> _sendCancellationToServer(String reason) async {
1✔
1026
    try {
1027
      // Build metadata with cancellation details.
1028
      final cancellationHeaders = [
1✔
1029
        RpcHeader('x-client-cancelled', 'true'),
1✔
1030
        RpcHeader('x-cancellation-reason', reason),
1✔
1031
        RpcHeader(
1✔
1032
          RpcHeaders.grpcStatus,
1033
          RpcStatus.cancelled.toString(),
1✔
1034
        ),
1035
      ];
1036

1037
      final cancellationMetadata = RpcMetadata(cancellationHeaders);
1✔
1038

1039
      _logger?.internal(
1✔
1040
        'Sending cancellation notice to server [streamId: $_streamId]',
×
1041
      );
1042

1043
      await _transport.sendMetadata(
2✔
1044
        _streamId,
1✔
1045
        cancellationMetadata,
1046
        endStream: true,
1047
      );
1048

1049
      _logger?.internal(
1✔
1050
        'Cancellation notice sent to server [streamId: $_streamId]',
×
1051
      );
1052
    } catch (e, stackTrace) {
1053
      _logger?.error(
×
1054
        'Failed to send cancellation metadata [streamId: $_streamId]',
×
1055
        error: e,
1056
        stackTrace: stackTrace,
1057
      );
1058
    }
1059
  }
1060

1061
  /// Validates context before making the call.
1062
  void _checkContextBeforeCall() {
16✔
1063
    if (_context == null) return;
16✔
1064

1065
    // Check cancellation.
1066
    _context!.cancellationToken?.throwIfCancelled();
29✔
1067

1068
    // Check deadline.
1069
    if (_context!.isExpired) {
20✔
1070
      throw RpcDeadlineExceededException(_context!.deadline!, Duration.zero);
3✔
1071
    }
1072

1073
    _logger?.internal(
18✔
1074
      'Context verified: requestId=${_context!.requestId}, traceId=${_context!.traceId} [streamId: $_streamId]',
48✔
1075
    );
1076
  }
1077

1078
  /// Handles an incoming response.
1079
  void _handleResponse(RpcTransportMessage message) {
16✔
1080
    if (!_isActive) return;
16✔
1081

1082
    _logger?.internal(
24✔
1083
      'Handling response [streamId: ${message.streamId}, isMetadataOnly: ${message.isMetadataOnly}, hasPayload: ${message.payload != null}, isDirect: ${message.isDirect}]',
40✔
1084
    );
1085

1086
    try {
1087
      // Handle metadata.
1088
      if (message.isMetadataOnly) {
16✔
1089
        final encoding = message.metadata?.getHeaderValue(
30✔
1090
          RpcHeaders.grpcEncoding,
1091
        );
1092
        if (encoding != null) {
1093
          _peerGrpcEncoding = encoding;
11✔
1094
        }
1095

1096
        final rpcMessage = RpcMessage.withMetadata<TResponse>(
15✔
1097
          message.metadata!,
15✔
1098
          isEndOfStream: message.isEndOfStream,
15✔
1099
        );
1100

1101
        if (!_responseController.isClosed) {
30✔
1102
          _responseController.add(rpcMessage);
30✔
1103
          _logger?.internal(
23✔
1104
            'Metadata pushed to response stream [streamId: $_streamId]',
16✔
1105
          );
1106
        }
1107
      }
1108

1109
      // Zero-copy: process direct object.
1110
      if (message.isDirect && message.directPayload != null) {
19✔
1111
        _processDirectResponse(message.directPayload!);
6✔
1112
      }
1113
      // Handle serialized payload.
1114
      else if (!message.isMetadataOnly && message.payload != null) {
30✔
1115
        _processResponseData(message.payload!);
28✔
1116
      }
1117

1118
      // Finish stream on END_STREAM.
1119
      if (message.isEndOfStream) {
16✔
1120
        _logger?.internal(
23✔
1121
          'END_STREAM received, closing response stream [streamId: $_streamId]',
16✔
1122
        );
1123
        if (!_responseController.isClosed) {
30✔
1124
          _responseController.close();
30✔
1125
        }
1126
      }
1127
    } catch (e, stackTrace) {
1128
      _logger?.error(
×
1129
        'Failed to process response [streamId: $_streamId]',
×
1130
        error: e,
1131
        stackTrace: stackTrace,
1132
      );
1133
      if (!_responseController.isClosed) {
×
1134
        _responseController.addError(e, stackTrace);
×
1135
      }
1136
    }
1137
  }
1138

1139
  /// Zero-copy: handles a direct response object without serialization.
1140
  void _processDirectResponse(Object directPayload) {
3✔
1141
    _logger?.internal(
5✔
1142
      'Zero-copy response handling [streamId: $_streamId, type: ${directPayload.runtimeType}]',
6✔
1143
    );
1144

1145
    try {
1146
      final response = directPayload as TResponse;
1147
      final rpcMessage = RpcMessage.withPayload<TResponse>(response);
2✔
1148

1149
      if (!_responseController.isClosed) {
4✔
1150
        _responseController.add(rpcMessage);
4✔
1151
        _logger?.internal(
4✔
1152
          'Zero-copy response added to response stream [streamId: $_streamId]',
4✔
1153
        );
1154
      } else {
1155
        _logger?.warning(
×
1156
          'Zero-copy: cannot add response to closed controller [streamId: $_streamId]',
×
1157
        );
1158
      }
1159
    } catch (e, stackTrace) {
1160
      _logger?.error(
1✔
1161
        'Zero-copy direct response handling error [streamId: $_streamId]',
×
1162
        error: e,
1163
        stackTrace: stackTrace,
1164
      );
1165
      if (!_responseController.isClosed) {
2✔
1166
        _responseController.addError(e, stackTrace);
2✔
1167
      }
1168
    }
1169
  }
1170

1171
  /// Processes response data (serialization mode only).
1172
  void _processResponseData(List<int> messageBytes) {
14✔
1173
    if (_isZeroCopy) {
14✔
1174
      _logger?.logRpcWarning(
1✔
1175
        message: 'Serialized response received in zero-copy mode, ignoring',
1176
        methodPath: _methodPath,
×
1177
        streamId: _streamId,
×
1178
      );
1179
      return;
1180
    }
1181

1182
    _logger?.internal(
20✔
1183
      'Received response payload: ${messageBytes.length} bytes [streamId: $_streamId]',
18✔
1184
    );
1185

1186
    try {
1187
      final uint8Message = messageBytes is Uint8List
14✔
1188
          ? messageBytes
1189
          : Uint8List.fromList(messageBytes);
×
1190

1191
      final messages = _parser!(uint8Message);
28✔
1192
      _logger?.internal(
20✔
1193
        'Parser extracted ${messages.length} messages from frame [streamId: $_streamId]',
18✔
1194
      );
1195

1196
      for (var msgBytes in messages) {
28✔
1197
        try {
1198
          _logger?.internal(
20✔
1199
            'Deserializing response of ${msgBytes.length} bytes [streamId: $_streamId]',
18✔
1200
          );
1201
          final response = _responseCodec!.deserialize(msgBytes);
28✔
1202

1203
          final rpcMessage = RpcMessage.withPayload<TResponse>(response);
13✔
1204

1205
          if (!_responseController.isClosed) {
26✔
1206
            _responseController.add(rpcMessage);
26✔
1207
            _logger?.internal(
19✔
1208
              'Deserialized response added to stream [streamId: $_streamId]',
12✔
1209
            );
1210
          } else {
1211
            _logger?.warning(
×
1212
              'Cannot add response to closed controller [streamId: $_streamId]',
×
1213
            );
1214
          }
1215
        } catch (e, stackTrace) {
1216
          _logger?.error(
1✔
1217
            'Failed to deserialize response [streamId: $_streamId]',
×
1218
            error: e,
1219
            stackTrace: stackTrace,
1220
          );
1221
          if (!_responseController.isClosed) {
2✔
1222
            _responseController.addError(e, stackTrace);
2✔
1223
          }
1224
        }
1225
      }
1226
    } catch (e, stackTrace) {
1227
      _logger?.error(
×
1228
        'Failed to parse response [streamId: $_streamId]',
×
1229
        error: e,
1230
        stackTrace: stackTrace,
1231
      );
1232
      if (!_responseController.isClosed) {
×
1233
        _responseController.addError(e, stackTrace);
×
1234
      }
1235
    }
1236
  }
1237

1238
  /// Sends a request to the server.
1239
  Future<void> send(TRequest request) async {
16✔
1240
    if (!_isActive) {
16✔
1241
      _logger?.warning('Attempted to send request on inactive processor');
1✔
1242
      return;
1243
    }
1244

1245
    if (!_requestController.isClosed) {
32✔
1246
      _requestController.add(request);
32✔
1247
    } else {
1248
      _logger?.warning('Attempted to send request to closed controller');
×
1249
    }
1250
  }
1251

1252
  /// Finishes sending requests.
1253
  Future<void> finishSending() async {
15✔
1254
    if (!_isActive) return;
15✔
1255

1256
    _logger?.internal(
23✔
1257
      'Finishing request send for $_methodPath [streamId: $_streamId]',
24✔
1258
    );
1259

1260
    if (!_requestController.isClosed) {
30✔
1261
      await _requestController.close();
30✔
1262
    }
1263
  }
1264

1265
  /// Closes the processor and releases resources.
1266
  Future<void> close() async {
16✔
1267
    if (!_isActive) return;
16✔
1268

1269
    _logger?.internal(
24✔
1270
      'Closing CallProcessor for $_methodPath [streamId: $_streamId]',
24✔
1271
    );
1272
    _isActive = false;
16✔
1273

1274
    await _requestSubscription?.cancel();
32✔
1275
    _requestSubscription = null;
16✔
1276

1277
    await _responseSubscription?.cancel();
32✔
1278
    _responseSubscription = null;
16✔
1279

1280
    await _cancellationSubscription?.cancel();
24✔
1281
    _cancellationSubscription = null;
16✔
1282

1283
    if (!_requestController.isClosed) {
32✔
1284
      _requestController.close();
10✔
1285
    }
1286

1287
    if (!_responseController.isClosed) {
32✔
1288
      _responseController.close();
18✔
1289
    }
1290
  }
1291
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc