• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nogipx / rpc_dart / 15788067207

20 Jun 2025 09:08PM UTC coverage: 68.718% (-7.4%) from 76.097%
15788067207

push

github

nogipx
improve tests

2524 of 3673 relevant lines covered (68.72%)

5.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.99
/lib/src/rpc/streams/base_processor.dart
1
// SPDX-FileCopyrightText: 2025 Karim "nogipx" Mamatkazin <nogipx@gmail.com>
2
//
3
// SPDX-License-Identifier: LGPL-3.0-or-later
4

5
part of '_index.dart';
6

7
/// Универсальный процессор для обработки стримов.
8
///
9
/// Автоматически определяет режим работы:
10
/// - Zero-copy для RpcInMemoryTransport (кодеки не нужны)
11
/// - Сериализация для сетевых транспортов (кодеки обязательны)
12
///
13
/// Преимущества:
14
/// - Нет race condition с транспортом
15
/// - Переиспользование логики между типами стримов
16
/// - Четкое разделение ответственности
17
/// - Работа с любыми типами объектов (не только IRpcSerializable)
18
/// - Автоматическая оптимизация для in-memory транспорта
19
final class StreamProcessor<TRequest extends Object, TResponse extends Object> {
20
  final RpcLogger? _logger;
21
  final IRpcTransport _transport;
22
  final int _streamId;
23
  final String _serviceName;
24
  final String _methodName;
25
  final IRpcCodec<TRequest>? _requestCodec;
26
  final IRpcCodec<TResponse>? _responseCodec;
27

28
  /// Парсер для обработки фрагментированных сообщений (только для сериализации)
29
  RpcMessageParser? _parser;
30

31
  /// Режим работы процессора
32
  final bool _isZeroCopy;
33

34
  /// Контроллер потока входящих запросов
35
  final StreamController<TRequest> _requestController =
36
      StreamController<TRequest>();
37

38
  /// Контроллер потока исходящих ответов
39
  final StreamController<TResponse> _responseController =
40
      StreamController<TResponse>();
41

42
  /// Подписка на входящий поток сообщений
43
  StreamSubscription? _messageSubscription;
44

45
  /// Флаг активности процессора
46
  bool _isActive = true;
47

48
  /// Флаг отправки начальных метаданных
49
  bool _initialMetadataSent = false;
50

51
  /// Путь метода в формате /ServiceName/MethodName
52
  late final String _methodPath;
53

54
  StreamProcessor({
10✔
55
    required IRpcTransport transport,
56
    required int streamId,
57
    required String serviceName,
58
    required String methodName,
59
    IRpcCodec<TRequest>? requestCodec,
60
    IRpcCodec<TResponse>? responseCodec,
61
    RpcLogger? logger,
62
  })  : _transport = transport,
63
        _streamId = streamId,
64
        _serviceName = serviceName,
65
        _methodName = methodName,
66
        _isZeroCopy = requestCodec == null && responseCodec == null,
67
        _requestCodec = requestCodec,
68
        _responseCodec = responseCodec,
69
        _logger = logger?.child('StreamProcessor') {
6✔
70
    // Валидация: для режима сериализации кодеки обязательны
71
    if (!_isZeroCopy) {
10✔
72
      if (_requestCodec == null || _responseCodec == null) {
16✔
73
        throw ArgumentError(
×
74
          'Кодеки обязательны для режима сериализации. '
75
          'Для zero-copy не передавайте кодеки (null).',
76
        );
77
      }
78
      _parser = RpcMessageParser(logger: _logger);
24✔
79
    } else {
80
      // Zero-copy режим: требуется поддержка zero-copy транспортом
81
      if (!transport.supportsZeroCopy) {
2✔
82
        throw ArgumentError(
×
83
          'Zero-copy режим требует транспорт с поддержкой zero-copy. '
84
          'Для сетевых транспортов передайте кодеки.',
85
        );
86
      }
87
    }
88

89
    _methodPath = '/$_serviceName/$_methodName';
40✔
90

91
    _logger?.internal(
16✔
92
        'Создан ${_isZeroCopy ? "Zero-copy" : "Serialized"} StreamProcessor для $_methodPath [streamId: $_streamId]');
24✔
93
    _setupResponseHandler();
10✔
94
  }
95

96
  /// Поток входящих запросов от клиента
97
  Stream<TRequest> get requests => _requestController.stream;
30✔
98

99
  /// Активен ли процессор
100
  bool get isActive => _isActive;
2✔
101

102
  /// Режим zero-copy
103
  bool get isZeroCopy => _isZeroCopy;
×
104

105
  /// Настраивает обработку исходящих ответов
106
  void _setupResponseHandler() {
10✔
107
    _responseController.stream.listen(
30✔
108
      (response) async {
10✔
109
        if (!_isActive) return;
10✔
110

111
        _logger?.internal(
16✔
112
            'Отправка ответа для $_methodPath [streamId: $_streamId]');
18✔
113
        try {
114
          if (_isZeroCopy) {
10✔
115
            // Zero-copy путь
116
            _logger
2✔
117
                ?.internal('Zero-copy отправка ответа [streamId: $_streamId]');
6✔
118
            await _transport.sendDirectObject(
4✔
119
              _streamId,
2✔
120
              response,
121
            );
122
            _logger?.internal(
4✔
123
                'Zero-copy ответ отправлен для $_methodPath [streamId: $_streamId]');
6✔
124
          } else {
125
            // Сериализация для сетевых транспортов
126
            final serialized = _responseCodec!.serialize(response);
16✔
127
            _logger?.internal(
12✔
128
                'Ответ сериализован, размер: ${serialized.length} байт [streamId: $_streamId]');
12✔
129

130
            final framedMessage = RpcMessageFrame.encode(serialized);
8✔
131
            await _transport.sendMessage(_streamId, framedMessage);
24✔
132

133
            _logger?.internal(
12✔
134
                'Ответ отправлен для $_methodPath [streamId: $_streamId]');
12✔
135
          }
136
        } catch (e, stackTrace) {
137
          // Проверяем, не закрыт ли транспорт
138
          if (e.toString().contains('Transport is closed') ||
×
139
              e.toString().contains('closed')) {
×
140
            _logger?.internal(
×
141
                'Транспорт закрыт, пропускаем отправку ответа [streamId: $_streamId]');
×
142
            return;
143
          }
144
          _logger?.error('Ошибка при отправке ответа [streamId: $_streamId]',
×
145
              error: e, stackTrace: stackTrace);
146
        }
147
      },
148
      onDone: () async {
10✔
149
        if (!_isActive) return;
10✔
150

151
        try {
152
          final trailers = RpcMetadata.forTrailer(RpcStatus.OK);
9✔
153
          await _transport.sendMetadata(_streamId, trailers, endStream: true);
27✔
154
          _logger?.internal(
15✔
155
              'Трейлер отправлен для $_methodPath [streamId: $_streamId]');
18✔
156
        } catch (e, stackTrace) {
157
          // Проверяем, не закрыт ли транспорт
158
          if (e.toString().contains('Transport is closed') ||
×
159
              e.toString().contains('closed')) {
×
160
            _logger?.internal(
×
161
                'Транспорт закрыт, пропускаем отправку трейлера [streamId: $_streamId]');
×
162
            return;
163
          }
164
          _logger?.error('Ошибка при отправке трейлера [streamId: $_streamId]',
×
165
              error: e, stackTrace: stackTrace);
166
        }
167
      },
168
      onError: (error, stackTrace) {
×
169
        _logger?.error(
×
170
            'Ошибка в потоке ответов для $_methodPath [streamId: $_streamId]',
×
171
            error: error,
172
            stackTrace: stackTrace);
173
      },
174
    );
175
  }
176

177
  /// Привязывает процессор к потоку сообщений от endpoint'а
178
  void bindToMessageStream(Stream<RpcTransportMessage> messageStream) {
10✔
179
    if (_messageSubscription != null) {
10✔
180
      _logger?.logRpcWarning(
1✔
181
        message: 'Stream processor already bound to message stream',
182
        methodPath: _methodPath,
×
183
        streamId: _streamId,
×
184
      );
185
      return;
186
    }
187

188
    _logger?.logStreamBound(
16✔
189
      methodPath: _methodPath,
6✔
190
      streamId: _streamId,
6✔
191
    );
192

193
    _messageSubscription = messageStream.listen(
20✔
194
      _handleMessage,
10✔
195
      onError: (error, stackTrace) {
×
196
        _logger?.logRpcError(
×
197
          operation: 'message_stream_listen',
198
          error: error,
199
          stackTrace: stackTrace,
200
          methodPath: _methodPath,
×
201
          streamId: _streamId,
×
202
        );
203
        if (!_requestController.isClosed) {
×
204
          _requestController.addError(error, stackTrace);
×
205
        }
206
      },
207
      onDone: () {
7✔
208
        _logger?.logStreamFinished(
13✔
209
          methodPath: _methodPath,
6✔
210
          streamId: _streamId,
6✔
211
          reason: 'message_stream_completed',
212
        );
213
        if (!_requestController.isClosed) {
14✔
214
          _requestController.close();
14✔
215
        }
216
      },
217
    );
218

219
    // НЕ отправляем начальные метаданные при подключении
220
    // Они будут отправлены при первом успешном ответе
221
    // или пропущены при ошибке (error response отправляется напрямую)
222
  }
223

224
  /// Обрабатывает входящее сообщение
225
  void _handleMessage(RpcTransportMessage message) {
10✔
226
    if (!_isActive) return;
10✔
227

228
    _logger?.logMessageReceived(
16✔
229
      streamId: message.streamId,
6✔
230
      messageType: message.isMetadataOnly
6✔
231
          ? 'metadata'
232
          : message.isDirect
6✔
233
              ? 'zero_copy'
234
              : 'serialized',
235
      payloadSize: message.payload?.length,
10✔
236
      isDirectPayload: message.isDirect,
6✔
237
    );
238

239
    // Zero-copy: обрабатываем прямой объект
240
    if (message.isDirect && message.directPayload != null) {
12✔
241
      _processDirectMessage(message.directPayload!);
4✔
242
    }
243
    // Обрабатываем сообщения с данными (стандартная сериализация)
244
    else if (!message.isMetadataOnly && message.payload != null) {
16✔
245
      _processDataMessage(message.payload!);
16✔
246
    }
247

248
    // Обрабатываем конец потока
249
    if (message.isEndOfStream) {
10✔
250
      _logger?.logStreamFinished(
15✔
251
        methodPath: _methodPath,
6✔
252
        streamId: _streamId,
6✔
253
        reason: 'end_of_stream_received',
254
      );
255
      if (!_requestController.isClosed) {
18✔
256
        _requestController.close();
18✔
257
      }
258
    }
259
  }
260

261
  /// Zero-copy: обрабатывает прямой объект без сериализации
262
  void _processDirectMessage(Object directPayload) {
2✔
263
    try {
264
      final request = directPayload as TRequest;
265

266
      if (!_requestController.isClosed) {
4✔
267
        _requestController.add(request);
4✔
268
      } else {
269
        _logger?.logRpcWarning(
×
270
          message: 'Cannot add request to closed controller (zero-copy)',
271
          methodPath: _methodPath,
×
272
          streamId: _streamId,
×
273
          metadata: {'transport_type': 'zero_copy'},
×
274
        );
275
      }
276
    } catch (e, stackTrace) {
277
      _logger?.logRpcError(
×
278
        operation: 'zero_copy_direct_object_processing',
279
        error: e,
280
        stackTrace: stackTrace,
281
        methodPath: _methodPath,
×
282
        streamId: _streamId,
×
283
        metadata: {'object_type': directPayload.runtimeType.toString()},
×
284
      );
285
      if (!_requestController.isClosed) {
×
286
        _requestController.addError(e, stackTrace);
×
287
      }
288
    }
289
  }
290

291
  /// Обрабатывает сообщение с данными (только для режима сериализации)
292
  void _processDataMessage(List<int> messageBytes) {
8✔
293
    if (_isZeroCopy) {
8✔
294
      _logger?.logRpcWarning(
×
295
        message: 'Serialized message received in zero-copy mode, ignoring',
296
        methodPath: _methodPath,
×
297
        streamId: _streamId,
×
298
      );
299
      return;
300
    }
301

302
    _logger?.logMessageReceived(
12✔
303
      streamId: _streamId,
4✔
304
      messageType: 'serialized_data',
305
      payloadSize: messageBytes.length,
4✔
306
    );
307

308
    try {
309
      // Конвертируем List<int> в Uint8List для парсера
310
      final uint8Message = messageBytes is Uint8List
8✔
311
          ? messageBytes
312
          : Uint8List.fromList(messageBytes);
×
313

314
      final messages = _parser!(uint8Message);
16✔
315

316
      for (var msgBytes in messages) {
16✔
317
        try {
318
          final request = _requestCodec!.deserialize(msgBytes);
16✔
319

320
          if (!_requestController.isClosed) {
16✔
321
            _requestController.add(request);
16✔
322
          } else {
323
            _logger?.logRpcWarning(
×
324
              message: 'Cannot add request to closed controller',
325
              methodPath: _methodPath,
×
326
              streamId: _streamId,
×
327
              metadata: {'message_size': msgBytes.length},
×
328
            );
329
          }
330
        } catch (e, stackTrace) {
331
          _logger?.logRpcError(
×
332
            operation: 'request_deserialization',
333
            error: e,
334
            stackTrace: stackTrace,
335
            methodPath: _methodPath,
×
336
            streamId: _streamId,
×
337
            metadata: {'message_size': msgBytes.length},
×
338
          );
339
          if (!_requestController.isClosed) {
×
340
            _requestController.addError(e, stackTrace);
×
341
          }
342
        }
343
      }
344
    } catch (e, stackTrace) {
345
      _logger?.logRpcError(
×
346
        operation: 'message_parsing',
347
        error: e,
348
        stackTrace: stackTrace,
349
        methodPath: _methodPath,
×
350
        streamId: _streamId,
×
351
        metadata: {'message_size': messageBytes.length},
×
352
      );
353
      if (!_requestController.isClosed) {
×
354
        _requestController.addError(e, stackTrace);
×
355
      }
356
    }
357
  }
358

359
  /// Отправляет ответ клиенту
360
  Future<void> send(TResponse response) async {
10✔
361
    if (!_isActive) {
10✔
362
      _logger?.warning('Попытка отправить ответ в неактивный процессор');
2✔
363
      return;
364
    }
365

366
    if (!_responseController.isClosed) {
20✔
367
      _responseController.add(response);
20✔
368
    } else {
369
      _logger?.warning('Попытка отправить ответ в закрытый контроллер');
×
370
    }
371
  }
372

373
  /// Отправляет ошибку клиенту
374
  Future<void> sendError(int statusCode, String message) async {
3✔
375
    if (!_isActive) {
3✔
376
      _logger?.warning('Попытка отправить ошибку в неактивный процессор');
1✔
377
      return;
378
    }
379

380
    _logger?.error(
3✔
381
        'Отправка ошибки клиенту: $statusCode - $message [streamId: $_streamId]');
×
382

383
    if (!_responseController.isClosed) {
6✔
384
      _responseController.close();
6✔
385
    }
386

387
    try {
388
      // Если начальные метаданные не были отправлены, отправляем error response сразу
389
      if (!_initialMetadataSent) {
3✔
390
        _logger?.internal(
3✔
391
            'Отправка ошибки без начальных метаданных [streamId: $_streamId]');
×
392
        // Создаем комбинированные метаданные: начальный response + error trailer
393
        final errorHeaders = [
3✔
394
          RpcHeader(':status', '200'), // HTTP 200 для gRPC
3✔
395
          RpcHeader(
3✔
396
              RpcConstants.CONTENT_TYPE_HEADER, RpcConstants.GRPC_CONTENT_TYPE),
397
          RpcHeader(RpcConstants.GRPC_STATUS_HEADER, statusCode.toString()),
6✔
398
        ];
399

400
        if (message.isNotEmpty) {
3✔
401
          errorHeaders
402
              .add(RpcHeader(RpcConstants.GRPC_MESSAGE_HEADER, message));
6✔
403
        }
404

405
        final errorMetadata = RpcMetadata(errorHeaders);
3✔
406
        await _transport.sendMetadata(_streamId, errorMetadata,
9✔
407
            endStream: true);
408
        _initialMetadataSent = true;
3✔
409
      } else {
410
        // Начальные метаданные уже отправлены, отправляем только trailer
411
        final trailers = RpcMetadata.forTrailer(statusCode, message: message);
×
412
        await _transport.sendMetadata(_streamId, trailers, endStream: true);
×
413
      }
414

415
      _logger?.internal('Ошибка отправлена клиенту [streamId: $_streamId]');
3✔
416
    } catch (e, stackTrace) {
417
      // Проверяем, не закрыт ли транспорт
418
      if (e.toString().contains('Transport is closed') ||
×
419
          e.toString().contains('closed')) {
×
420
        _logger?.internal(
×
421
            'Транспорт закрыт, пропускаем отправку ошибки [streamId: $_streamId]');
×
422
        return;
423
      }
424
      _logger?.error(
×
425
          'Ошибка при отправке ошибки клиенту [streamId: $_streamId]',
×
426
          error: e,
427
          stackTrace: stackTrace);
428
    }
429
  }
430

431
  /// Завершает отправку ответов
432
  Future<void> finishSending() async {
9✔
433
    if (!_isActive) return;
9✔
434

435
    _logger?.internal(
15✔
436
        'Завершение отправки ответов для $_methodPath [streamId: $_streamId]');
18✔
437

438
    if (!_responseController.isClosed) {
18✔
439
      await _responseController.close();
18✔
440
    }
441
  }
442

443
  /// Закрывает процессор и освобождает ресурсы
444
  Future<void> close() async {
4✔
445
    if (!_isActive) return;
4✔
446

447
    _logger?.internal(
4✔
448
        'Закрытие StreamProcessor для $_methodPath [streamId: $_streamId]');
×
449
    _isActive = false;
4✔
450

451
    await _messageSubscription?.cancel();
8✔
452
    _messageSubscription = null;
4✔
453

454
    if (!_requestController.isClosed) {
8✔
455
      _requestController.close();
8✔
456
    }
457

458
    if (!_responseController.isClosed) {
8✔
459
      _responseController.close();
8✔
460
    }
461
  }
462
}
463

464
/// Универсальный процессор для клиентских вызовов RPC стримов.
465
///
466
/// Автоматически определяет режим работы:
467
/// - Zero-copy для RpcInMemoryTransport (кодеки не нужны)
468
/// - Сериализация для сетевых транспортов (кодеки обязательны)
469
///
470
/// Преимущества:
471
/// - Переиспользование кода между типами стримов
472
/// - Отсутствие race condition
473
/// - Четкое разделение ответственности
474
/// - Тестируемость без внепроцессных зависимостей
475
/// - Работа с любыми типами объектов (не только IRpcSerializable)
476
/// - Автоматическая оптимизация для in-memory транспорта
477
final class CallProcessor<TRequest extends Object, TResponse extends Object> {
478
  final RpcLogger? _logger;
479
  final IRpcTransport _transport;
480
  final int _streamId;
481
  final String _serviceName;
482
  final String _methodName;
483
  final IRpcCodec<TRequest>? _requestCodec;
484
  final IRpcCodec<TResponse>? _responseCodec;
485

486
  /// RPC контекст для передачи метаданных, таймаутов и отмены
487
  final RpcContext? _context;
488

489
  /// Парсер для обработки фрагментированных сообщений (только для сериализации)
490
  RpcMessageParser? _parser;
491

492
  /// Режим работы процессора
493
  final bool _isZeroCopy;
494

495
  /// Контроллер потока исходящих запросов
496
  final StreamController<TRequest> _requestController =
497
      StreamController<TRequest>();
498

499
  /// Контроллер потока входящих ответов
500
  final StreamController<RpcMessage<TResponse>> _responseController =
501
      StreamController<RpcMessage<TResponse>>();
502

503
  /// Подписка на исходящие запросы
504
  StreamSubscription? _requestSubscription;
505

506
  /// Подписка на входящие ответы
507
  StreamSubscription? _responseSubscription;
508

509
  /// Флаг активности процессора
510
  bool _isActive = true;
511

512
  /// Флаг отправки начальных метаданных
513
  bool _initialMetadataSent = false;
514

515
  /// Путь метода в формате /ServiceName/MethodName
516
  late final String _methodPath;
517

518
  CallProcessor({
10✔
519
    required IRpcTransport transport,
520
    required String serviceName,
521
    required String methodName,
522
    IRpcCodec<TRequest>? requestCodec,
523
    IRpcCodec<TResponse>? responseCodec,
524
    RpcContext? context,
525
    RpcLogger? logger,
526
  })  : _transport = transport,
527
        _streamId = transport.createStream(),
10✔
528
        _serviceName = serviceName,
529
        _methodName = methodName,
530
        _isZeroCopy = requestCodec == null && responseCodec == null,
531
        _requestCodec = requestCodec,
532
        _responseCodec = responseCodec,
533
        _context = context,
534
        _logger = logger?.child('CallProcessor') {
6✔
535
    // Валидация: для режима сериализации кодеки обязательны
536
    if (!_isZeroCopy) {
10✔
537
      if (_requestCodec == null || _responseCodec == null) {
16✔
538
        throw ArgumentError(
×
539
          'Кодеки обязательны для режима сериализации. '
540
          'Для zero-copy не передавайте кодеки (null).',
541
        );
542
      }
543
      _parser = RpcMessageParser(logger: _logger);
24✔
544
    } else {
545
      // Zero-copy режим: требуется поддержка zero-copy транспортом
546
      if (!transport.supportsZeroCopy) {
2✔
547
        throw ArgumentError(
×
548
          'Zero-copy режим требует транспорт с поддержкой zero-copy. '
549
          'Для сетевых транспортов передайте кодеки.',
550
        );
551
      }
552
    }
553

554
    _methodPath = '/$_serviceName/$_methodName';
40✔
555

556
    _logger?.internal(
16✔
557
        'Создан ${_isZeroCopy ? "Zero-copy" : "Serialized"} CallProcessor для $_methodPath [streamId: $_streamId]');
24✔
558

559
    // Проверяем контекст перед началом работы
560
    _checkContextBeforeCall();
10✔
561

562
    _setupRequestHandler();
10✔
563
    _setupResponseHandler();
10✔
564
  }
565

566
  /// Поток входящих ответов от сервера
567
  Stream<RpcMessage<TResponse>> get responses => _responseController.stream;
30✔
568

569
  /// Активен ли процессор
570
  bool get isActive => _isActive;
2✔
571

572
  /// ID стрима
573
  int get streamId => _streamId;
2✔
574

575
  /// Режим zero-copy
576
  bool get isZeroCopy => _isZeroCopy;
×
577

578
  /// Настраивает обработку исходящих запросов
579
  void _setupRequestHandler() {
10✔
580
    _requestSubscription = _requestController.stream.listen(
40✔
581
      (request) async {
10✔
582
        if (!_isActive) return;
10✔
583

584
        try {
585
          // Отправляем начальные метаданные при первом запросе
586
          if (!_initialMetadataSent) {
10✔
587
            await _sendInitialMetadata();
10✔
588
            _initialMetadataSent = true;
10✔
589
          }
590

591
          _logger?.internal(
16✔
592
              'Отправка запроса для $_methodPath [streamId: $_streamId]');
18✔
593

594
          if (_isZeroCopy) {
10✔
595
            // Zero-copy путь
596
            _logger
2✔
597
                ?.internal('Zero-copy отправка запроса [streamId: $_streamId]');
6✔
598
            await _transport.sendDirectObject(
4✔
599
              _streamId,
2✔
600
              request,
601
            );
602
            _logger?.internal(
4✔
603
                'Zero-copy запрос отправлен для $_methodPath [streamId: $_streamId]');
6✔
604
          } else {
605
            // Сериализация для сетевых транспортов
606
            final serialized = _requestCodec!.serialize(request);
16✔
607
            _logger?.internal(
12✔
608
                'Запрос сериализован, размер: ${serialized.length} байт [streamId: $_streamId]');
12✔
609

610
            final framedMessage = RpcMessageFrame.encode(serialized);
8✔
611
            await _transport.sendMessage(_streamId, framedMessage);
24✔
612

613
            _logger?.internal(
12✔
614
                'Запрос отправлен для $_methodPath [streamId: $_streamId]');
12✔
615
          }
616
        } catch (e, stackTrace) {
617
          _logger?.error('Ошибка при отправке запроса [streamId: $_streamId]',
4✔
618
              error: e, stackTrace: stackTrace);
619
          if (!_responseController.isClosed) {
2✔
620
            _responseController.addError(e, stackTrace);
2✔
621
          }
622

623
          // 🔥 КРИТИЧЕСКОЕ ИСПРАВЛЕНИЕ: При ошибке роутинга немедленно завершаем обработку
624
          // Это предотвратит дальнейшую отправку запросов и заставит stream завершиться с ошибкой
625
          if (!_requestController.isClosed) {
2✔
626
            _requestController.close();
×
627
          }
628
        }
629
      },
630
      onDone: () async {
9✔
631
        if (!_isActive) return;
9✔
632

633
        try {
634
          await _transport.finishSending(_streamId);
27✔
635
          _logger?.internal(
15✔
636
              'finishSending выполнен для $_methodPath [streamId: $_streamId]');
18✔
637
        } catch (e, stackTrace) {
638
          _logger?.error(
×
639
              'Ошибка при завершении отправки запросов [streamId: $_streamId]',
×
640
              error: e,
641
              stackTrace: stackTrace);
642
        }
643
      },
644
      onError: (error, stackTrace) {
×
645
        _logger?.error(
×
646
            'Ошибка в потоке запросов для $_methodPath [streamId: $_streamId]',
×
647
            error: error,
648
            stackTrace: stackTrace);
649
        if (!_responseController.isClosed) {
×
650
          _responseController.addError(error, stackTrace);
×
651
        }
652
      },
653
    );
654
  }
655

656
  /// Настраивает обработку входящих ответов
657
  void _setupResponseHandler() {
10✔
658
    _responseSubscription = _transport.getMessagesForStream(_streamId).listen(
50✔
659
      _handleResponse,
10✔
660
      onError: (error, stackTrace) {
×
661
        _logger?.error('Ошибка в потоке ответов',
×
662
            error: error, stackTrace: stackTrace);
663
        if (!_responseController.isClosed) {
×
664
          _responseController.addError(error, stackTrace);
×
665
        }
666
      },
667
      onDone: () {
7✔
668
        _logger?.internal(
13✔
669
            'Поток ответов завершен для $_methodPath [streamId: $_streamId]');
18✔
670
        if (!_responseController.isClosed) {
14✔
671
          _responseController.close();
2✔
672
        }
673
      },
674
    );
675
  }
676

677
  /// Отправляет начальные метаданные с поддержкой контекста
678
  Future<void> _sendInitialMetadata() async {
10✔
679
    _logger?.internal(
16✔
680
        'Отправка начальных метаданных для $_methodPath [streamId: $_streamId]');
18✔
681

682
    final baseMetadata =
683
        RpcMetadata.forClientRequest(_serviceName, _methodName);
30✔
684

685
    // Создаем новые метаданные с заголовками из контекста
686
    final headers = List<RpcHeader>.from(baseMetadata.headers);
20✔
687

688
    if (_context != null) {
10✔
689
      // Добавляем пользовательские заголовки из контекста
690
      for (final entry in _context!.headers.entries) {
24✔
691
        headers.add(RpcHeader(entry.key, entry.value));
24✔
692
      }
693

694
      // Добавляем специальные заголовки RPC
695
      if (_context!.traceId != null) {
12✔
696
        headers.add(RpcHeader('x-trace-id', _context!.traceId!));
24✔
697
      }
698
      headers.add(RpcHeader('x-request-id', _context!.requestId));
24✔
699

700
      // Context values остаются ЛОКАЛЬНЫМИ - не передаются через сеть (соответствует стандарту gRPC)
701
      // Только headers передаются через HTTP/2 заголовки
702

703
      _logger?.internal(
12✔
704
          'Добавлены заголовки контекста: ${_context!.headers.length} пользовательских + системные [streamId: $_streamId]');
30✔
705
    } else {
706
      // Даже для null контекста добавляем базовый request-id
707
      final requestId =
708
          RpcContext.empty().requestId; // Генерируем базовый request-id
8✔
709
      headers.add(RpcHeader('x-request-id', requestId));
8✔
710

711
      _logger?.internal(
4✔
712
          'Добавлен базовый request-id для null контекста [streamId: $_streamId]');
×
713
    }
714

715
    final metadata = RpcMetadata(headers);
10✔
716
    await _transport.sendMetadata(_streamId, metadata);
30✔
717

718
    _logger?.internal(
16✔
719
        'Начальные метаданные отправлены для $_methodPath [streamId: $_streamId]');
18✔
720
  }
721

722
  /// Проверяет состояние контекста перед вызовом
723
  void _checkContextBeforeCall() {
10✔
724
    if (_context == null) return;
10✔
725

726
    // Проверяем отмену
727
    _context!.cancellationToken?.throwIfCancelled();
12✔
728

729
    // Проверяем deadline
730
    if (_context!.isExpired) {
12✔
731
      throw RpcDeadlineExceededException(
×
732
        _context!.deadline!,
×
733
        Duration.zero,
734
      );
735
    }
736

737
    _logger?.internal(
12✔
738
        'Контекст проверен: requestId=${_context!.requestId}, traceId=${_context!.traceId} [streamId: $_streamId]');
36✔
739
  }
740

741
  /// Обрабатывает входящий ответ
742
  void _handleResponse(RpcTransportMessage message) {
10✔
743
    if (!_isActive) return;
10✔
744

745
    _logger?.internal(
16✔
746
        'Обработка ответа [streamId: ${message.streamId}, isMetadataOnly: ${message.isMetadataOnly}, hasPayload: ${message.payload != null}, isDirect: ${message.isDirect}]');
30✔
747

748
    try {
749
      // Обрабатываем метаданные
750
      if (message.isMetadataOnly) {
10✔
751
        final rpcMessage = RpcMessage.withMetadata<TResponse>(
9✔
752
          message.metadata!,
9✔
753
          isEndOfStream: message.isEndOfStream,
9✔
754
        );
755

756
        if (!_responseController.isClosed) {
18✔
757
          _responseController.add(rpcMessage);
18✔
758
          _logger?.internal(
15✔
759
              'Метаданные добавлены в поток ответов [streamId: $_streamId]');
12✔
760
        }
761
      }
762

763
      // Zero-copy: обрабатываем прямой объект
764
      if (message.isDirect && message.directPayload != null) {
12✔
765
        _processDirectResponse(message.directPayload!);
4✔
766
      }
767
      // Обрабатываем сообщения с данными (стандартная сериализация)
768
      else if (!message.isMetadataOnly && message.payload != null) {
18✔
769
        _processResponseData(message.payload!);
16✔
770
      }
771

772
      // Завершаем поток при получении END_STREAM
773
      if (message.isEndOfStream) {
10✔
774
        _logger?.internal(
15✔
775
            'Получен END_STREAM, закрываем поток ответов [streamId: $_streamId]');
12✔
776
        if (!_responseController.isClosed) {
18✔
777
          _responseController.close();
18✔
778
        }
779
      }
780
    } catch (e, stackTrace) {
781
      _logger?.error('Ошибка при обработке ответа [streamId: $_streamId]',
×
782
          error: e, stackTrace: stackTrace);
783
      if (!_responseController.isClosed) {
×
784
        _responseController.addError(e, stackTrace);
×
785
      }
786
    }
787
  }
788

789
  /// Zero-copy: обрабатывает прямой объект ответа без сериализации
790
  void _processDirectResponse(Object directPayload) {
2✔
791
    _logger?.internal(
4✔
792
        'Zero-copy обработка прямого ответа [streamId: $_streamId, type: ${directPayload.runtimeType}]');
6✔
793

794
    try {
795
      final response = directPayload as TResponse;
796
      final rpcMessage = RpcMessage.withPayload<TResponse>(response);
2✔
797

798
      if (!_responseController.isClosed) {
4✔
799
        _responseController.add(rpcMessage);
4✔
800
        _logger?.internal(
4✔
801
            'Zero-copy ответ добавлен в поток ответов [streamId: $_streamId]');
4✔
802
      } else {
803
        _logger?.warning(
×
804
            'Zero-copy: не могу добавить ответ в закрытый контроллер [streamId: $_streamId]');
×
805
      }
806
    } catch (e, stackTrace) {
807
      _logger?.error(
×
808
          'Zero-copy ошибка при обработке прямого ответа [streamId: $_streamId]',
×
809
          error: e,
810
          stackTrace: stackTrace);
811
      if (!_responseController.isClosed) {
×
812
        _responseController.addError(e, stackTrace);
×
813
      }
814
    }
815
  }
816

817
  /// Обрабатывает данные ответа (только для режима сериализации)
818
  void _processResponseData(List<int> messageBytes) {
8✔
819
    if (_isZeroCopy) {
8✔
820
      _logger?.logRpcWarning(
×
821
        message: 'Serialized response received in zero-copy mode, ignoring',
822
        methodPath: _methodPath,
×
823
        streamId: _streamId,
×
824
      );
825
      return;
826
    }
827

828
    _logger?.internal(
12✔
829
        'Получен ответ размером: ${messageBytes.length} байт [streamId: $_streamId]');
12✔
830

831
    try {
832
      final uint8Message = messageBytes is Uint8List
8✔
833
          ? messageBytes
834
          : Uint8List.fromList(messageBytes);
×
835

836
      final messages = _parser!(uint8Message);
16✔
837
      _logger?.internal(
12✔
838
          'Парсер извлек ${messages.length} сообщений из фрейма [streamId: $_streamId]');
12✔
839

840
      for (var msgBytes in messages) {
16✔
841
        try {
842
          _logger?.internal(
12✔
843
              'Десериализация ответа размером ${msgBytes.length} байт [streamId: $_streamId]');
12✔
844
          final response = _responseCodec!.deserialize(msgBytes);
16✔
845

846
          final rpcMessage = RpcMessage.withPayload<TResponse>(response);
8✔
847

848
          if (!_responseController.isClosed) {
16✔
849
            _responseController.add(rpcMessage);
16✔
850
            _logger?.internal(
12✔
851
                'Ответ десериализован и добавлен в поток ответов [streamId: $_streamId]');
8✔
852
          } else {
853
            _logger?.warning(
×
854
                'Не могу добавить ответ в закрытый контроллер [streamId: $_streamId]');
×
855
          }
856
        } catch (e, stackTrace) {
857
          _logger?.error(
×
858
              'Ошибка при десериализации ответа [streamId: $_streamId]',
×
859
              error: e,
860
              stackTrace: stackTrace);
861
          if (!_responseController.isClosed) {
×
862
            _responseController.addError(e, stackTrace);
×
863
          }
864
        }
865
      }
866
    } catch (e, stackTrace) {
867
      _logger?.error('Ошибка при парсинге ответа [streamId: $_streamId]',
×
868
          error: e, stackTrace: stackTrace);
869
      if (!_responseController.isClosed) {
×
870
        _responseController.addError(e, stackTrace);
×
871
      }
872
    }
873
  }
874

875
  /// Отправляет запрос серверу
876
  Future<void> send(TRequest request) async {
10✔
877
    if (!_isActive) {
10✔
878
      _logger?.warning('Попытка отправить запрос в неактивный процессор');
1✔
879
      return;
880
    }
881

882
    if (!_requestController.isClosed) {
20✔
883
      _requestController.add(request);
20✔
884
    } else {
885
      _logger?.warning('Попытка отправить запрос в закрытый контроллер');
×
886
    }
887
  }
888

889
  /// Завершает отправку запросов
890
  Future<void> finishSending() async {
9✔
891
    if (!_isActive) return;
9✔
892

893
    _logger?.internal(
15✔
894
        'Завершение отправки запросов для $_methodPath [streamId: $_streamId]');
18✔
895

896
    if (!_requestController.isClosed) {
18✔
897
      await _requestController.close();
18✔
898
    }
899
  }
900

901
  /// Закрывает процессор и освобождает ресурсы
902
  Future<void> close() async {
10✔
903
    if (!_isActive) return;
10✔
904

905
    _logger?.internal(
16✔
906
        'Закрытие CallProcessor для $_methodPath [streamId: $_streamId]');
18✔
907
    _isActive = false;
10✔
908

909
    await _requestSubscription?.cancel();
20✔
910
    _requestSubscription = null;
10✔
911

912
    await _responseSubscription?.cancel();
20✔
913
    _responseSubscription = null;
10✔
914

915
    if (!_requestController.isClosed) {
20✔
916
      _requestController.close();
20✔
917
    }
918

919
    if (!_responseController.isClosed) {
20✔
920
      _responseController.close();
20✔
921
    }
922
  }
923
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc