• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nogipx / rpc_dart / 23169302222

16 Mar 2026 10:40PM UTC coverage: 76.365% (+0.2%) from 76.156%
23169302222

push

github

nogipx
update deps

3887 of 5090 relevant lines covered (76.37%)

8.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.67
/packages/rpc_dart/lib/src/core/parser.dart
1
// SPDX-FileCopyrightText: 2025 Karim "nogipx" Mamatkazin <nogipx@gmail.com>
2
// SPDX-FileCopyrightText: 2026 Karim "nogipx" Mamatkazin <nogipx@gmail.com>
3
//
4
// SPDX-License-Identifier: MIT
5

6
import 'dart:typed_data';
7

8
import 'errors.dart';
9
import '../logs/_logs.dart';
10
import 'protocol.dart';
11

12
/// Internal state for parsing incoming gRPC stream data.
13
///
14
/// Manages buffering and parse state for fragmented gRPC messages, which may
15
/// arrive split across fragments or multiple messages per fragment.
16
final class _MessageParserState {
17
  /// Current accumulated buffer.
18
  List<int> buffer = [];
19

20
  /// Expected length of the current message (null until the header is read).
21
  int? expectedMessageLength;
22

23
  /// Compression flag for the message being processed.
24
  bool isCompressed = false;
25

26
  /// Resets state so the next message can be processed.
27
  void reset() {
17✔
28
    expectedMessageLength = null;
17✔
29
    isCompressed = false;
17✔
30
  }
31
}
32

33
/// Parser that reassembles fragmented gRPC messages.
34
///
35
/// Collects full messages from HTTP/2 DATA frames where gRPC payloads may not
36
/// align with frame boundaries.
37
final class RpcMessageParser {
38
  final RpcLogger? _logger;
39
  final int _maxMessageLength;
40
  final int _maxBufferedBytes;
41
  final Uint8List Function(Uint8List payload)? _decompressor;
42
  final int _maxMessagesPerChunk;
43

44
  RpcMessageParser({
21✔
45
    RpcLogger? logger,
46
    int maxMessageLength = 64 * 1024 * 1024,
47
    int? maxBufferedBytes,
48
    Uint8List Function(Uint8List payload)? decompressor,
49
    int maxMessagesPerChunk = 1024,
50
  })  : _logger = logger,
51
        _maxMessageLength = maxMessageLength,
52
        _maxBufferedBytes = maxBufferedBytes ??
53
            (maxMessageLength + RpcConstants.messagePrefixSize),
21✔
54
        _decompressor = decompressor,
55
        _maxMessagesPerChunk = maxMessagesPerChunk;
56

57
  /// Internal parser state.
58
  final _MessageParserState _state = _MessageParserState();
59

60
  /// Processes an incoming data fragment and returns complete messages.
61
  ///
62
  /// Accumulates data in a buffer and uses the 5-byte prefix to extract
63
  /// complete messages. Can emit multiple messages from one fragment or keep
64
  /// buffering until a full message is available.
65
  ///
66
  /// [data] New chunk of incoming data.
67
  /// Returns the list of complete messages extracted.
68
  List<Uint8List> call(Uint8List data) {
17✔
69
    try {
70
      return _call(data);
17✔
71
    } catch (e, trace) {
72
      _logger?.error(
2✔
73
        'Failed to parse incoming data: $e',
×
74
        error: e,
75
        stackTrace: trace,
76
      );
77
      rethrow;
78
    }
79
  }
80

81
  List<Uint8List> _call(Uint8List data) {
17✔
82
    final result = <Uint8List>[];
17✔
83

84
    // Append data to the buffer.
85
    _state.buffer.addAll(data);
51✔
86
    if (_state.buffer.length > _maxBufferedBytes) {
85✔
87
      final buffered = _state.buffer.length;
×
88
      _state.buffer.clear();
×
89
      _state.reset();
×
90
      throw RpcException(
×
91
        'gRPC frame buffer overflow: $buffered bytes (max: $_maxBufferedBytes)',
×
92
      );
93
    }
94

95
    // Process buffer while messages can be extracted.
96
    while (_state.buffer.length >= RpcConstants.messagePrefixSize) {
68✔
97
      // If length is unknown yet, extract it from the header.
98
      if (_state.expectedMessageLength == null) {
34✔
99
        try {
100
          final header = RpcMessageFrame.parseHeader(
17✔
101
            Uint8List.fromList(
17✔
102
              _state.buffer.sublist(0, RpcConstants.messagePrefixSize),
51✔
103
            ),
104
          );
105
          _state.isCompressed = header.isCompressed;
51✔
106
          _state.expectedMessageLength = header.messageLength;
51✔
107

108
          if (_state.expectedMessageLength! > _maxMessageLength) {
68✔
109
            final length = _state.expectedMessageLength!;
4✔
110
            _state.buffer.clear();
6✔
111
            _state.reset();
4✔
112
            throw RpcException(
2✔
113
              'gRPC frame payload is too large: $length bytes (max: $_maxMessageLength)',
4✔
114
            );
115
          }
116

117
          // Remove the header from the buffer.
118
          _state.buffer = _state.buffer.sublist(
80✔
119
            RpcConstants.messagePrefixSize,
120
          );
121
        } catch (e, trace) {
122
          _logger?.error(
2✔
123
            'Failed to parse frame header: $e',
×
124
            error: e,
125
            stackTrace: trace,
126
          );
127
          _state.buffer.clear();
6✔
128
          _state.reset();
4✔
129
          rethrow;
130
        }
131
      }
132

133
      // If we have enough data for a complete message.
134
      if (_state.buffer.length >= _state.expectedMessageLength!) {
96✔
135
        // Extract the message.
136
        final messageBytes = _state.buffer.sublist(
48✔
137
          0,
138
          _state.expectedMessageLength!,
32✔
139
        );
140
        var payload = Uint8List.fromList(messageBytes);
16✔
141
        if (_state.isCompressed) {
32✔
142
          final decompressor = _decompressor;
12✔
143
          if (decompressor == null) {
144
            // No decompressor at this layer: reconstruct the complete gRPC
145
            // frame (with compression bit set) and pass it through so the
146
            // application layer can decompress it.
147
            payload = RpcMessageFrame.encode(payload, compressed: true);
×
148
          } else {
149
            payload = decompressor(payload);
12✔
150
            if (payload.length > _maxMessageLength) {
36✔
151
              final length = payload.length;
×
152
              _state.buffer.clear();
×
153
              _state.reset();
×
154
              throw RpcException(
×
155
                'Decompressed gRPC payload is too large: $length bytes (max: $_maxMessageLength)',
×
156
              );
157
            }
158
          }
159
        }
160
        result.add(payload);
16✔
161
        if (result.length > _maxMessagesPerChunk) {
48✔
162
          _state.buffer.clear();
×
163
          _state.reset();
×
164
          throw RpcException(
×
165
            'Too many gRPC messages in a single chunk: ${result.length} (max: $_maxMessagesPerChunk)',
×
166
          );
167
        }
168

169
        // Drop processed bytes from the buffer.
170
        _state.buffer = _state.buffer.sublist(_state.expectedMessageLength!);
112✔
171

172
        // Reset for the next message.
173
        _state.reset();
32✔
174
      } else {
175
        // Not enough data yet; wait for the next chunk.
176
        break;
177
      }
178
    }
179

180
    _logger?.internal(
22✔
181
      'Chunk processed, messages extracted: ${result.length}',
12✔
182
    );
183
    return result;
184
  }
185
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc