• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nogipx / rpc_dart / 16231688622

11 Jul 2025 11:35PM UTC coverage: 67.709% (-1.4%) from 69.114%
16231688622

push

github

nogipx
fix timeout cancellation and token registrations

41 of 59 new or added lines in 5 files covered. (69.49%)

408 existing lines in 8 files now uncovered.

2663 of 3933 relevant lines covered (67.71%)

5.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.04
/lib/src/rpc/streams/base_processor.dart
1
// SPDX-FileCopyrightText: 2025 Karim "nogipx" Mamatkazin <nogipx@gmail.com>
2
//
3
// SPDX-License-Identifier: MIT
4

5
part of '_index.dart';
6

7
/// Универсальный процессор для обработки стримов.
8
///
9
/// Автоматически определяет режим работы:
10
/// - Zero-copy для RpcInMemoryTransport (кодеки не нужны)
11
/// - Сериализация для сетевых транспортов (кодеки обязательны)
12
///
13
/// Преимущества:
14
/// - Нет race condition с транспортом
15
/// - Переиспользование логики между типами стримов
16
/// - Четкое разделение ответственности
17
/// - Работа с любыми типами объектов (не только IRpcSerializable)
18
/// - Автоматическая оптимизация для in-memory транспорта
19
/// - Поддержка cancellation token для прерывания операций
20
final class StreamProcessor<TRequest extends Object, TResponse extends Object> {
21
  final RpcLogger? _logger;
22
  final IRpcTransport _transport;
23
  final int _streamId;
24
  final String _serviceName;
25
  final String _methodName;
26
  final IRpcCodec<TRequest>? _requestCodec;
27
  final IRpcCodec<TResponse>? _responseCodec;
28

29
  /// RPC контекст с токеном отмены и метаданными
30
  final RpcContext? _context;
31

32
  /// Подписка на отмену операции
33
  StreamSubscription? _cancellationSubscription;
34

35
  /// Парсер для обработки фрагментированных сообщений (только для сериализации)
36
  RpcMessageParser? _parser;
37

38
  /// Режим работы процессора
39
  final bool _isZeroCopy;
40

41
  /// Контроллер потока входящих запросов
42
  final StreamController<TRequest> _requestController =
43
      StreamController<TRequest>();
44

45
  /// Контроллер потока исходящих ответов
46
  final StreamController<TResponse> _responseController =
47
      StreamController<TResponse>();
48

49
  /// Подписка на входящий поток сообщений
50
  StreamSubscription? _messageSubscription;
51

52
  /// Флаг активности процессора
53
  bool _isActive = true;
54

55
  /// Флаг отправки начальных метаданных
56
  bool _initialMetadataSent = false;
57

58
  /// Путь метода в формате /ServiceName/MethodName
59
  late final String _methodPath;
60

61
  StreamProcessor({
10✔
62
    required IRpcTransport transport,
63
    required int streamId,
64
    required String serviceName,
65
    required String methodName,
66
    IRpcCodec<TRequest>? requestCodec,
67
    IRpcCodec<TResponse>? responseCodec,
68
    RpcContext? context,
69
    RpcLogger? logger,
70
  })  : _transport = transport,
71
        _streamId = streamId,
72
        _serviceName = serviceName,
73
        _methodName = methodName,
74
        _isZeroCopy = requestCodec == null && responseCodec == null,
75
        _requestCodec = requestCodec,
76
        _responseCodec = responseCodec,
77
        _context = context,
78
        _logger = logger?.child('StreamProcessor') {
6✔
79
    // Валидация: для режима сериализации кодеки обязательны
80
    if (!_isZeroCopy) {
10✔
81
      if (_requestCodec == null || _responseCodec == null) {
16✔
82
        throw ArgumentError(
×
83
          'Кодеки обязательны для режима сериализации. '
84
          'Для zero-copy не передавайте кодеки (null).',
85
        );
86
      }
87
      _parser = RpcMessageParser(logger: _logger);
24✔
88
    } else {
89
      // Zero-copy режим: требуется поддержка zero-copy транспортом
90
      if (!transport.supportsZeroCopy) {
2✔
UNCOV
91
        throw ArgumentError(
×
92
          'Zero-copy режим требует транспорт с поддержкой zero-copy. '
93
          'Для сетевых транспортов передайте кодеки.',
94
        );
95
      }
96
    }
97

98
    _methodPath = '/$_serviceName/$_methodName';
40✔
99

100
    _logger?.internal(
16✔
101
        'Создан ${_isZeroCopy ? "Zero-copy" : "Serialized"} StreamProcessor для $_methodPath [streamId: $_streamId]${_context?.cancellationToken != null ? " с cancellation token" : ""}');
36✔
102

103
    _setupCancellationMonitoring();
10✔
104
    _setupResponseHandler();
10✔
105
  }
106

107
  /// Поток входящих запросов от клиента
108
  Stream<TRequest> get requests => _requestController.stream;
30✔
109

110
  /// Активен ли процессор
111
  bool get isActive => _isActive;
2✔
112

113
  /// Режим zero-copy
UNCOV
114
  bool get isZeroCopy => _isZeroCopy;
×
115

116
  /// Настраивает обработку исходящих ответов
117
  void _setupResponseHandler() {
10✔
118
    _responseController.stream.listen(
30✔
119
      (response) async {
10✔
120
        if (!_isActive) return;
10✔
121

122
        _logger?.internal(
16✔
123
            'Отправка ответа для $_methodPath [streamId: $_streamId]');
18✔
124
        try {
125
          if (_isZeroCopy) {
10✔
126
            // Zero-copy путь
127
            _logger
2✔
128
                ?.internal('Zero-copy отправка ответа [streamId: $_streamId]');
6✔
129
            await _transport.sendDirectObject(
4✔
130
              _streamId,
2✔
131
              response,
132
            );
133
            _logger?.internal(
4✔
134
                'Zero-copy ответ отправлен для $_methodPath [streamId: $_streamId]');
6✔
135
          } else {
136
            // Сериализация для сетевых транспортов
137
            final serialized = _responseCodec!.serialize(response);
16✔
138
            _logger?.internal(
12✔
139
                'Ответ сериализован, размер: ${serialized.length} байт [streamId: $_streamId]');
12✔
140

141
            final framedMessage = RpcMessageFrame.encode(serialized);
8✔
142
            await _transport.sendMessage(_streamId, framedMessage);
24✔
143

144
            _logger?.internal(
12✔
145
                'Ответ отправлен для $_methodPath [streamId: $_streamId]');
12✔
146
          }
147
        } catch (e, stackTrace) {
148
          // Проверяем, не закрыт ли транспорт
UNCOV
149
          if (e.toString().contains('Transport is closed') ||
×
UNCOV
150
              e.toString().contains('closed')) {
×
UNCOV
151
            _logger?.internal(
×
UNCOV
152
                'Транспорт закрыт, пропускаем отправку ответа [streamId: $_streamId]');
×
153
            return;
154
          }
UNCOV
155
          _logger?.error('Ошибка при отправке ответа [streamId: $_streamId]',
×
156
              error: e, stackTrace: stackTrace);
157
        }
158
      },
159
      onDone: () async {
10✔
160
        if (!_isActive) return;
10✔
161

162
        try {
163
          final trailers = RpcMetadata.forTrailer(RpcStatus.OK);
9✔
164
          await _transport.sendMetadata(_streamId, trailers, endStream: true);
27✔
165
          _logger?.internal(
15✔
166
              'Трейлер отправлен для $_methodPath [streamId: $_streamId]');
18✔
167
        } catch (e, stackTrace) {
168
          // Проверяем, не закрыт ли транспорт
169
          if (e.toString().contains('Transport is closed') ||
×
170
              e.toString().contains('closed')) {
×
UNCOV
171
            _logger?.internal(
×
UNCOV
172
                'Транспорт закрыт, пропускаем отправку трейлера [streamId: $_streamId]');
×
173
            return;
174
          }
UNCOV
175
          _logger?.error('Ошибка при отправке трейлера [streamId: $_streamId]',
×
176
              error: e, stackTrace: stackTrace);
177
        }
178
      },
UNCOV
179
      onError: (error, stackTrace) {
×
UNCOV
180
        _logger?.error(
×
UNCOV
181
            'Ошибка в потоке ответов для $_methodPath [streamId: $_streamId]',
×
182
            error: error,
183
            stackTrace: stackTrace);
184
      },
185
    );
186
  }
187

188
  /// Привязывает процессор к потоку сообщений от endpoint'а
189
  void bindToMessageStream(Stream<RpcTransportMessage> messageStream) {
10✔
190
    if (_messageSubscription != null) {
10✔
191
      _logger?.logRpcWarning(
1✔
192
        message: 'Stream processor already bound to message stream',
UNCOV
193
        methodPath: _methodPath,
×
UNCOV
194
        streamId: _streamId,
×
195
      );
196
      return;
197
    }
198

199
    _logger?.logStreamBound(
16✔
200
      methodPath: _methodPath,
6✔
201
      streamId: _streamId,
6✔
202
    );
203

204
    _messageSubscription = messageStream.listen(
20✔
205
      _handleMessage,
10✔
UNCOV
206
      onError: (error, stackTrace) {
×
UNCOV
207
        _logger?.logRpcError(
×
208
          operation: 'message_stream_listen',
209
          error: error,
210
          stackTrace: stackTrace,
UNCOV
211
          methodPath: _methodPath,
×
UNCOV
212
          streamId: _streamId,
×
213
        );
UNCOV
214
        if (!_requestController.isClosed) {
×
UNCOV
215
          _requestController.addError(error, stackTrace);
×
216
        }
217
      },
218
      onDone: () {
7✔
219
        _logger?.logStreamFinished(
13✔
220
          methodPath: _methodPath,
6✔
221
          streamId: _streamId,
6✔
222
          reason: 'message_stream_completed',
223
        );
224
        if (!_requestController.isClosed) {
14✔
225
          _requestController.close();
14✔
226
        }
227
      },
228
    );
229

230
    // НЕ отправляем начальные метаданные при подключении
231
    // Они будут отправлены при первом успешном ответе
232
    // или пропущены при ошибке (error response отправляется напрямую)
233
  }
234

235
  /// Проверяет токен отмены и выбрасывает исключение если отменен
236
  void _checkCancellation() {
10✔
237
    _context?.cancellationToken?.throwIfCancelled();
16✔
238
  }
239

240
  /// Обрабатывает входящее сообщение
241
  void _handleMessage(RpcTransportMessage message) {
10✔
242
    if (!_isActive) return;
10✔
243

244
    // Проверяем отмену перед обработкой каждого сообщения
245
    try {
246
      _checkCancellation();
10✔
247
    } catch (e) {
UNCOV
248
      _logger?.internal(
×
UNCOV
249
        'Сообщение пропущено из-за отмены [streamId: $_streamId]',
×
250
      );
251
      return;
252
    }
253

254
    _logger?.logMessageReceived(
16✔
255
      streamId: message.streamId,
6✔
256
      messageType: message.isMetadataOnly
6✔
257
          ? 'metadata'
258
          : message.isDirect
6✔
259
              ? 'zero_copy'
260
              : 'serialized',
261
      payloadSize: message.payload?.length,
10✔
262
      isDirectPayload: message.isDirect,
6✔
263
    );
264

265
    // Zero-copy: обрабатываем прямой объект
266
    if (message.isDirect && message.directPayload != null) {
12✔
267
      _processDirectMessage(message.directPayload!);
4✔
268
    }
269
    // Обрабатываем сообщения с данными (стандартная сериализация)
270
    else if (!message.isMetadataOnly && message.payload != null) {
16✔
271
      _processDataMessage(message.payload!);
16✔
272
    }
273

274
    // Обрабатываем конец потока
275
    if (message.isEndOfStream) {
10✔
276
      _logger?.logStreamFinished(
15✔
277
        methodPath: _methodPath,
6✔
278
        streamId: _streamId,
6✔
279
        reason: 'end_of_stream_received',
280
      );
281
      if (!_requestController.isClosed) {
18✔
282
        _requestController.close();
18✔
283
      }
284
    }
285
  }
286

287
  /// Zero-copy: обрабатывает прямой объект без сериализации
288
  void _processDirectMessage(Object directPayload) {
2✔
289
    try {
290
      final request = directPayload as TRequest;
291

292
      if (!_requestController.isClosed) {
4✔
293
        _requestController.add(request);
4✔
294
      } else {
UNCOV
295
        _logger?.logRpcWarning(
×
296
          message: 'Cannot add request to closed controller (zero-copy)',
297
          methodPath: _methodPath,
×
UNCOV
298
          streamId: _streamId,
×
UNCOV
299
          metadata: {'transport_type': 'zero_copy'},
×
300
        );
301
      }
302
    } catch (e, stackTrace) {
UNCOV
303
      _logger?.logRpcError(
×
304
        operation: 'zero_copy_direct_object_processing',
305
        error: e,
306
        stackTrace: stackTrace,
UNCOV
307
        methodPath: _methodPath,
×
UNCOV
308
        streamId: _streamId,
×
UNCOV
309
        metadata: {'object_type': directPayload.runtimeType.toString()},
×
310
      );
UNCOV
311
      if (!_requestController.isClosed) {
×
312
        _requestController.addError(e, stackTrace);
×
313
      }
314
    }
315
  }
316

317
  /// Обрабатывает сообщение с данными (только для режима сериализации)
318
  void _processDataMessage(List<int> messageBytes) {
8✔
319
    if (_isZeroCopy) {
8✔
UNCOV
320
      _logger?.logRpcWarning(
×
321
        message: 'Serialized message received in zero-copy mode, ignoring',
UNCOV
322
        methodPath: _methodPath,
×
323
        streamId: _streamId,
×
324
      );
325
      return;
326
    }
327

328
    _logger?.logMessageReceived(
12✔
329
      streamId: _streamId,
4✔
330
      messageType: 'serialized_data',
331
      payloadSize: messageBytes.length,
4✔
332
    );
333

334
    try {
335
      // Конвертируем List<int> в Uint8List для парсера
336
      final uint8Message = messageBytes is Uint8List
8✔
337
          ? messageBytes
UNCOV
338
          : Uint8List.fromList(messageBytes);
×
339

340
      final messages = _parser!(uint8Message);
16✔
341

342
      for (var msgBytes in messages) {
16✔
343
        try {
344
          final request = _requestCodec!.deserialize(msgBytes);
16✔
345

346
          if (!_requestController.isClosed) {
16✔
347
            _requestController.add(request);
16✔
348
          } else {
349
            _logger?.logRpcWarning(
×
350
              message: 'Cannot add request to closed controller',
351
              methodPath: _methodPath,
×
UNCOV
352
              streamId: _streamId,
×
353
              metadata: {'message_size': msgBytes.length},
×
354
            );
355
          }
356
        } catch (e, stackTrace) {
UNCOV
357
          _logger?.logRpcError(
×
358
            operation: 'request_deserialization',
359
            error: e,
360
            stackTrace: stackTrace,
UNCOV
361
            methodPath: _methodPath,
×
UNCOV
362
            streamId: _streamId,
×
UNCOV
363
            metadata: {'message_size': msgBytes.length},
×
364
          );
UNCOV
365
          if (!_requestController.isClosed) {
×
UNCOV
366
            _requestController.addError(e, stackTrace);
×
367
          }
368
        }
369
      }
370
    } catch (e, stackTrace) {
UNCOV
371
      _logger?.logRpcError(
×
372
        operation: 'message_parsing',
373
        error: e,
374
        stackTrace: stackTrace,
UNCOV
375
        methodPath: _methodPath,
×
UNCOV
376
        streamId: _streamId,
×
UNCOV
377
        metadata: {'message_size': messageBytes.length},
×
378
      );
UNCOV
379
      if (!_requestController.isClosed) {
×
UNCOV
380
        _requestController.addError(e, stackTrace);
×
381
      }
382
    }
383
  }
384

385
  /// Отправляет ответ клиенту
386
  Future<void> send(TResponse response) async {
10✔
387
    if (!_isActive) {
10✔
388
      _logger?.warning('Попытка отправить ответ в неактивный процессор');
2✔
389
      return;
390
    }
391

392
    // Проверяем отмену перед отправкой ответа
393
    try {
394
      _checkCancellation();
10✔
395
    } catch (e) {
UNCOV
396
      _logger?.internal(
×
UNCOV
397
        'Ответ не отправлен из-за отмены [streamId: $_streamId]',
×
398
      );
399
      return;
400
    }
401

402
    if (!_responseController.isClosed) {
20✔
403
      _responseController.add(response);
20✔
404
    } else {
UNCOV
405
      _logger?.warning('Попытка отправить ответ в закрытый контроллер');
×
406
    }
407
  }
408

409
  /// Отправляет ошибку клиенту
410
  Future<void> sendError(int statusCode, String message) async {
3✔
411
    if (!_isActive) {
3✔
412
      _logger?.warning('Попытка отправить ошибку в неактивный процессор');
1✔
413
      return;
414
    }
415

416
    _logger?.error(
3✔
UNCOV
417
        'Отправка ошибки клиенту: $statusCode - $message [streamId: $_streamId]');
×
418

419
    if (!_responseController.isClosed) {
6✔
420
      _responseController.close();
6✔
421
    }
422

423
    try {
424
      // Если начальные метаданные не были отправлены, отправляем error response сразу
425
      if (!_initialMetadataSent) {
3✔
426
        _logger?.internal(
3✔
UNCOV
427
            'Отправка ошибки без начальных метаданных [streamId: $_streamId]');
×
428
        // Создаем комбинированные метаданные: начальный response + error trailer
429
        final errorHeaders = [
3✔
430
          RpcHeader(':status', '200'), // HTTP 200 для gRPC
3✔
431
          RpcHeader(
3✔
432
              RpcConstants.CONTENT_TYPE_HEADER, RpcConstants.GRPC_CONTENT_TYPE),
433
          RpcHeader(RpcConstants.GRPC_STATUS_HEADER, statusCode.toString()),
6✔
434
        ];
435

436
        if (message.isNotEmpty) {
3✔
437
          errorHeaders
438
              .add(RpcHeader(RpcConstants.GRPC_MESSAGE_HEADER, message));
6✔
439
        }
440

441
        final errorMetadata = RpcMetadata(errorHeaders);
3✔
442
        await _transport.sendMetadata(_streamId, errorMetadata,
9✔
443
            endStream: true);
444
        _initialMetadataSent = true;
3✔
445
      } else {
446
        // Начальные метаданные уже отправлены, отправляем только trailer
UNCOV
447
        final trailers = RpcMetadata.forTrailer(statusCode, message: message);
×
448
        await _transport.sendMetadata(_streamId, trailers, endStream: true);
×
449
      }
450

451
      _logger?.internal('Ошибка отправлена клиенту [streamId: $_streamId]');
3✔
452
    } catch (e, stackTrace) {
453
      // Проверяем, не закрыт ли транспорт
UNCOV
454
      if (e.toString().contains('Transport is closed') ||
×
UNCOV
455
          e.toString().contains('closed')) {
×
UNCOV
456
        _logger?.internal(
×
UNCOV
457
            'Транспорт закрыт, пропускаем отправку ошибки [streamId: $_streamId]');
×
458
        return;
459
      }
UNCOV
460
      _logger?.error(
×
UNCOV
461
          'Ошибка при отправке ошибки клиенту [streamId: $_streamId]',
×
462
          error: e,
463
          stackTrace: stackTrace);
464
    }
465
  }
466

467
  /// Завершает отправку ответов
468
  Future<void> finishSending() async {
9✔
469
    if (!_isActive) return;
9✔
470

471
    _logger?.internal(
15✔
472
        'Завершение отправки ответов для $_methodPath [streamId: $_streamId]');
18✔
473

474
    if (!_responseController.isClosed) {
18✔
475
      await _responseController.close();
18✔
476
    }
477
  }
478

479
  /// Закрывает процессор и освобождает ресурсы
480
  Future<void> close() async {
4✔
481
    if (!_isActive) return;
4✔
482

483
    _logger?.internal(
4✔
UNCOV
484
        'Закрытие StreamProcessor для $_methodPath [streamId: $_streamId]');
×
485
    _isActive = false;
4✔
486

487
    // Отменяем все подписки
488
    await _messageSubscription?.cancel();
8✔
489
    _messageSubscription = null;
4✔
490

491
    await _cancellationSubscription?.cancel();
4✔
492
    _cancellationSubscription = null;
4✔
493

494
    if (!_requestController.isClosed) {
8✔
495
      _requestController.close();
8✔
496
    }
497

498
    if (!_responseController.isClosed) {
8✔
499
      _responseController.close();
8✔
500
    }
501
  }
502

503
  /// Настраивает мониторинг отмены операции
504
  void _setupCancellationMonitoring() {
10✔
505
    if (_context?.cancellationToken != null) {
16✔
UNCOV
506
      _cancellationSubscription =
×
UNCOV
507
          _context!.cancellationToken!.cancelled.asStream().listen(
×
UNCOV
508
        (_) {
×
UNCOV
509
          _logger?.internal(
×
UNCOV
510
            'Операция отменена, закрываем процессор [streamId: $_streamId]',
×
511
          );
UNCOV
512
          _isActive = false;
×
513

514
          final reason =
UNCOV
515
              _context!.cancellationToken!.reason ?? 'Operation was cancelled';
×
UNCOV
516
          final cancelledException = RpcCancelledException(reason);
×
517

UNCOV
518
          if (!_requestController.isClosed) {
×
UNCOV
519
            _requestController.addError(cancelledException);
×
520
          }
UNCOV
521
          if (!_responseController.isClosed) {
×
UNCOV
522
            _responseController.addError(cancelledException);
×
523
          }
524

525
          // Отменяем подписки
UNCOV
526
          _messageSubscription?.cancel();
×
UNCOV
527
          _cancellationSubscription?.cancel();
×
528
        },
UNCOV
529
        onError: (error, stackTrace) {
×
UNCOV
530
          _logger?.error(
×
UNCOV
531
            'Ошибка при мониторинге отмены [streamId: $_streamId]',
×
532
            error: error,
533
            stackTrace: stackTrace,
534
          );
535
        },
536
      );
537
    }
538
  }
539
}
540

541
/// Универсальный процессор для клиентских вызовов RPC стримов.
542
///
543
/// Автоматически определяет режим работы:
544
/// - Zero-copy для RpcInMemoryTransport (кодеки не нужны)
545
/// - Сериализация для сетевых транспортов (кодеки обязательны)
546
///
547
/// Преимущества:
548
/// - Переиспользование кода между типами стримов
549
/// - Отсутствие race condition
550
/// - Четкое разделение ответственности
551
/// - Тестируемость без внепроцессных зависимостей
552
/// - Работа с любыми типами объектов (не только IRpcSerializable)
553
/// - Автоматическая оптимизация для in-memory транспорта
554
final class CallProcessor<TRequest extends Object, TResponse extends Object> {
555
  final RpcLogger? _logger;
556
  final IRpcTransport _transport;
557
  final int _streamId;
558
  final String _serviceName;
559
  final String _methodName;
560
  final IRpcCodec<TRequest>? _requestCodec;
561
  final IRpcCodec<TResponse>? _responseCodec;
562

563
  /// RPC контекст для передачи метаданных, таймаутов и отмены
564
  final RpcContext? _context;
565

566
  /// Подписка на отмену операции
567
  StreamSubscription? _cancellationSubscription;
568

569
  /// Парсер для обработки фрагментированных сообщений (только для сериализации)
570
  RpcMessageParser? _parser;
571

572
  /// Режим работы процессора
573
  final bool _isZeroCopy;
574

575
  /// Контроллер потока исходящих запросов
576
  final StreamController<TRequest> _requestController =
577
      StreamController<TRequest>();
578

579
  /// Контроллер потока входящих ответов
580
  final StreamController<RpcMessage<TResponse>> _responseController =
581
      StreamController<RpcMessage<TResponse>>();
582

583
  /// Подписка на исходящие запросы
584
  StreamSubscription? _requestSubscription;
585

586
  /// Подписка на входящие ответы
587
  StreamSubscription? _responseSubscription;
588

589
  /// Флаг активности процессора
590
  bool _isActive = true;
591

592
  /// Флаг отправки начальных метаданных
593
  bool _initialMetadataSent = false;
594

595
  /// Путь метода в формате /ServiceName/MethodName
596
  late final String _methodPath;
597

598
  CallProcessor({
10✔
599
    required IRpcTransport transport,
600
    required String serviceName,
601
    required String methodName,
602
    IRpcCodec<TRequest>? requestCodec,
603
    IRpcCodec<TResponse>? responseCodec,
604
    RpcContext? context,
605
    RpcLogger? logger,
606
  })  : _transport = transport,
607
        _streamId = transport.createStream(),
10✔
608
        _serviceName = serviceName,
609
        _methodName = methodName,
610
        _isZeroCopy = requestCodec == null && responseCodec == null,
611
        _requestCodec = requestCodec,
612
        _responseCodec = responseCodec,
613
        _context = context,
614
        _logger = logger?.child('CallProcessor') {
6✔
615
    // Валидация: для режима сериализации кодеки обязательны
616
    if (!_isZeroCopy) {
10✔
617
      if (_requestCodec == null || _responseCodec == null) {
16✔
UNCOV
618
        throw ArgumentError(
×
619
          'Кодеки обязательны для режима сериализации. '
620
          'Для zero-copy не передавайте кодеки (null).',
621
        );
622
      }
623
      _parser = RpcMessageParser(logger: _logger);
24✔
624
    } else {
625
      // Zero-copy режим: требуется поддержка zero-copy транспортом
626
      if (!transport.supportsZeroCopy) {
2✔
UNCOV
627
        throw ArgumentError(
×
628
          'Zero-copy режим требует транспорт с поддержкой zero-copy. '
629
          'Для сетевых транспортов передайте кодеки.',
630
        );
631
      }
632
    }
633

634
    _methodPath = '/$_serviceName/$_methodName';
40✔
635

636
    _logger?.internal(
16✔
637
        'Создан ${_isZeroCopy ? "Zero-copy" : "Serialized"} CallProcessor для $_methodPath [streamId: $_streamId]${_context?.cancellationToken != null ? " с cancellation token" : ""}');
36✔
638

639
    // Проверяем контекст перед началом работы
640
    _checkContextBeforeCall();
10✔
641

642
    _setupCancellationMonitoring();
10✔
643
    _setupRequestHandler();
10✔
644
    _setupResponseHandler();
10✔
645
  }
646

647
  /// Поток входящих ответов от сервера
648
  Stream<RpcMessage<TResponse>> get responses => _responseController.stream;
30✔
649

650
  /// Активен ли процессор
651
  bool get isActive => _isActive;
2✔
652

653
  /// ID стрима
654
  int get streamId => _streamId;
2✔
655

656
  /// Режим zero-copy
UNCOV
657
  bool get isZeroCopy => _isZeroCopy;
×
658

659
  /// Настраивает обработку исходящих запросов
660
  void _setupRequestHandler() {
10✔
661
    _requestSubscription = _requestController.stream.listen(
40✔
662
      (request) async {
10✔
663
        if (!_isActive) return;
10✔
664

665
        try {
666
          // Отправляем начальные метаданные при первом запросе
667
          if (!_initialMetadataSent) {
10✔
668
            await _sendInitialMetadata();
10✔
669
            _initialMetadataSent = true;
10✔
670
          }
671

672
          _logger?.internal(
16✔
673
              'Отправка запроса для $_methodPath [streamId: $_streamId]');
18✔
674

675
          if (_isZeroCopy) {
10✔
676
            // Zero-copy путь
677
            _logger
2✔
678
                ?.internal('Zero-copy отправка запроса [streamId: $_streamId]');
6✔
679
            await _transport.sendDirectObject(
4✔
680
              _streamId,
2✔
681
              request,
682
            );
683
            _logger?.internal(
4✔
684
                'Zero-copy запрос отправлен для $_methodPath [streamId: $_streamId]');
6✔
685
          } else {
686
            // Сериализация для сетевых транспортов
687
            final serialized = _requestCodec!.serialize(request);
16✔
688
            _logger?.internal(
12✔
689
                'Запрос сериализован, размер: ${serialized.length} байт [streamId: $_streamId]');
12✔
690

691
            final framedMessage = RpcMessageFrame.encode(serialized);
8✔
692
            await _transport.sendMessage(_streamId, framedMessage);
24✔
693

694
            _logger?.internal(
12✔
695
                'Запрос отправлен для $_methodPath [streamId: $_streamId]');
12✔
696
          }
697
        } catch (e, stackTrace) {
698
          _logger?.error('Ошибка при отправке запроса [streamId: $_streamId]',
4✔
699
              error: e, stackTrace: stackTrace);
700
          if (!_responseController.isClosed) {
2✔
701
            _responseController.addError(e, stackTrace);
2✔
702
          }
703

704
          // 🔥 КРИТИЧЕСКОЕ ИСПРАВЛЕНИЕ: При ошибке роутинга немедленно завершаем обработку
705
          // Это предотвратит дальнейшую отправку запросов и заставит stream завершиться с ошибкой
706
          if (!_requestController.isClosed) {
2✔
UNCOV
707
            _requestController.close();
×
708
          }
709
        }
710
      },
711
      onDone: () async {
9✔
712
        if (!_isActive) return;
9✔
713

714
        try {
715
          await _transport.finishSending(_streamId);
27✔
716
          _logger?.internal(
15✔
717
              'finishSending выполнен для $_methodPath [streamId: $_streamId]');
18✔
718
        } catch (e, stackTrace) {
UNCOV
719
          _logger?.error(
×
UNCOV
720
              'Ошибка при завершении отправки запросов [streamId: $_streamId]',
×
721
              error: e,
722
              stackTrace: stackTrace);
723
        }
724
      },
UNCOV
725
      onError: (error, stackTrace) {
×
UNCOV
726
        _logger?.error(
×
UNCOV
727
            'Ошибка в потоке запросов для $_methodPath [streamId: $_streamId]',
×
728
            error: error,
729
            stackTrace: stackTrace);
UNCOV
730
        if (!_responseController.isClosed) {
×
731
          _responseController.addError(error, stackTrace);
×
732
        }
733
      },
734
    );
735
  }
736

737
  /// Настраивает обработку входящих ответов
738
  void _setupResponseHandler() {
10✔
739
    _responseSubscription = _transport.getMessagesForStream(_streamId).listen(
50✔
740
      _handleResponse,
10✔
UNCOV
741
      onError: (error, stackTrace) {
×
UNCOV
742
        _logger?.error('Ошибка в потоке ответов',
×
743
            error: error, stackTrace: stackTrace);
UNCOV
744
        if (!_responseController.isClosed) {
×
UNCOV
745
          _responseController.addError(error, stackTrace);
×
746
        }
747
      },
748
      onDone: () {
7✔
749
        _logger?.internal(
13✔
750
            'Поток ответов завершен для $_methodPath [streamId: $_streamId]');
18✔
751
        if (!_responseController.isClosed) {
14✔
752
          _responseController.close();
2✔
753
        }
754
      },
755
    );
756
  }
757

758
  /// Отправляет начальные метаданные с поддержкой контекста
759
  Future<void> _sendInitialMetadata() async {
10✔
760
    _logger?.internal(
16✔
761
        'Отправка начальных метаданных для $_methodPath [streamId: $_streamId]');
18✔
762

763
    final baseMetadata =
764
        RpcMetadata.forClientRequest(_serviceName, _methodName);
30✔
765

766
    // Создаем новые метаданные с заголовками из контекста
767
    final headers = List<RpcHeader>.from(baseMetadata.headers);
20✔
768

769
    if (_context != null) {
10✔
770
      // Добавляем пользовательские заголовки из контекста
771
      for (final entry in _context!.headers.entries) {
24✔
772
        headers.add(RpcHeader(entry.key, entry.value));
24✔
773
      }
774

775
      // Добавляем специальные заголовки RPC
776
      if (_context!.traceId != null) {
12✔
777
        headers.add(RpcHeader('x-trace-id', _context!.traceId!));
24✔
778
      }
779
      headers.add(RpcHeader('x-request-id', _context!.requestId));
24✔
780

781
      // Передаем deadline серверу
782
      if (_context!.deadline != null) {
12✔
NEW
783
        headers.add(RpcHeader('x-deadline',
×
NEW
784
            _context!.deadline!.millisecondsSinceEpoch.toString()));
×
785
      }
786

787
      // Context values остаются ЛОКАЛЬНЫМИ - не передаются через сеть (соответствует стандарту gRPC)
788
      // Только headers передаются через HTTP/2 заголовки
789

790
      _logger?.internal(
12✔
791
          'Добавлены заголовки контекста: ${_context!.headers.length} пользовательских + системные [streamId: $_streamId]');
30✔
792
    } else {
793
      // Даже для null контекста добавляем базовый request-id
794
      final requestId =
795
          RpcContext.empty().requestId; // Генерируем базовый request-id
8✔
796
      headers.add(RpcHeader('x-request-id', requestId));
8✔
797

798
      _logger?.internal(
4✔
UNCOV
799
          'Добавлен базовый request-id для null контекста [streamId: $_streamId]');
×
800
    }
801

802
    final metadata = RpcMetadata(headers);
10✔
803
    await _transport.sendMetadata(_streamId, metadata);
30✔
804

805
    _logger?.internal(
16✔
806
        'Начальные метаданные отправлены для $_methodPath [streamId: $_streamId]');
18✔
807
  }
808

809
  /// Настраивает мониторинг отмены операции для CallProcessor
810
  void _setupCancellationMonitoring() {
10✔
811
    if (_context?.cancellationToken != null) {
16✔
812
      _cancellationSubscription =
6✔
813
          _context!.cancellationToken!.cancelled.asStream().listen(
30✔
814
        (_) async {
×
UNCOV
815
          _logger?.internal(
×
UNCOV
816
            'Операция отменена клиентом, отправляем уведомление серверу [streamId: $_streamId]',
×
817
          );
818

819
          try {
820
            // Отправляем специальное сообщение отмены серверу
UNCOV
821
            final reason = _context!.cancellationToken!.reason ??
×
822
                'Operation cancelled by client';
UNCOV
823
            await _sendCancellationToServer(reason);
×
824
          } catch (e, stackTrace) {
UNCOV
825
            _logger?.error(
×
826
              'Ошибка при отправке уведомления об отмене [streamId: $_streamId]',
×
827
              error: e,
828
              stackTrace: stackTrace,
829
            );
830
          }
831

UNCOV
832
          _isActive = false;
×
UNCOV
833
          final cancelledException = RpcCancelledException(
×
UNCOV
834
              _context!.cancellationToken!.reason ?? 'Operation was cancelled');
×
835

UNCOV
836
          if (!_requestController.isClosed) {
×
UNCOV
837
            _requestController.addError(cancelledException);
×
838
          }
UNCOV
839
          if (!_responseController.isClosed) {
×
840
            _responseController.addError(cancelledException);
×
841
          }
842

843
          // Отменяем подписки
UNCOV
844
          await _requestSubscription?.cancel();
×
UNCOV
845
          await _responseSubscription?.cancel();
×
UNCOV
846
          _cancellationSubscription?.cancel();
×
847
        },
UNCOV
848
        onError: (error, stackTrace) {
×
UNCOV
849
          _logger?.error(
×
UNCOV
850
            'Ошибка при мониторинге отмены [streamId: $_streamId]',
×
851
            error: error,
852
            stackTrace: stackTrace,
853
          );
854
        },
855
      );
856
    }
857
  }
858

859
  /// Отправляет уведомление об отмене серверу
860
  Future<void> _sendCancellationToServer(String reason) async {
×
861
    try {
862
      // Создаем специальные метаданные с уведомлением об отмене
863
      final cancellationHeaders = [
×
864
        RpcHeader('x-client-cancelled', 'true'),
×
UNCOV
865
        RpcHeader('x-cancellation-reason', reason),
×
UNCOV
866
        RpcHeader(
×
867
            RpcConstants.GRPC_STATUS_HEADER, RpcStatus.CANCELLED.toString()),
×
868
      ];
869

UNCOV
870
      final cancellationMetadata = RpcMetadata(cancellationHeaders);
×
871

UNCOV
872
      _logger?.internal(
×
873
        'Отправка уведомления об отмене серверу [streamId: $_streamId]',
×
874
      );
875

876
      await _transport.sendMetadata(_streamId, cancellationMetadata,
×
877
          endStream: true);
878

UNCOV
879
      _logger?.internal(
×
UNCOV
880
        'Уведомление об отмене отправлено серверу [streamId: $_streamId]',
×
881
      );
882
    } catch (e, stackTrace) {
UNCOV
883
      _logger?.error(
×
UNCOV
884
        'Ошибка при отправке метаданных отмены [streamId: $_streamId]',
×
885
        error: e,
886
        stackTrace: stackTrace,
887
      );
888
    }
889
  }
890

891
  /// Проверяет состояние контекста перед вызовом
892
  void _checkContextBeforeCall() {
10✔
893
    if (_context == null) return;
10✔
894

895
    // Проверяем отмену
896
    _context!.cancellationToken?.throwIfCancelled();
18✔
897

898
    // Проверяем deadline
899
    if (_context!.isExpired) {
12✔
UNCOV
900
      throw RpcDeadlineExceededException(
×
UNCOV
901
        _context!.deadline!,
×
902
        Duration.zero,
903
      );
904
    }
905

906
    _logger?.internal(
12✔
907
        'Контекст проверен: requestId=${_context!.requestId}, traceId=${_context!.traceId} [streamId: $_streamId]');
36✔
908
  }
909

910
  /// Обрабатывает входящий ответ
911
  void _handleResponse(RpcTransportMessage message) {
10✔
912
    if (!_isActive) return;
10✔
913

914
    _logger?.internal(
16✔
915
        'Обработка ответа [streamId: ${message.streamId}, isMetadataOnly: ${message.isMetadataOnly}, hasPayload: ${message.payload != null}, isDirect: ${message.isDirect}]');
30✔
916

917
    try {
918
      // Обрабатываем метаданные
919
      if (message.isMetadataOnly) {
10✔
920
        final rpcMessage = RpcMessage.withMetadata<TResponse>(
9✔
921
          message.metadata!,
9✔
922
          isEndOfStream: message.isEndOfStream,
9✔
923
        );
924

925
        if (!_responseController.isClosed) {
18✔
926
          _responseController.add(rpcMessage);
18✔
927
          _logger?.internal(
15✔
928
              'Метаданные добавлены в поток ответов [streamId: $_streamId]');
12✔
929
        }
930
      }
931

932
      // Zero-copy: обрабатываем прямой объект
933
      if (message.isDirect && message.directPayload != null) {
12✔
934
        _processDirectResponse(message.directPayload!);
4✔
935
      }
936
      // Обрабатываем сообщения с данными (стандартная сериализация)
937
      else if (!message.isMetadataOnly && message.payload != null) {
18✔
938
        _processResponseData(message.payload!);
16✔
939
      }
940

941
      // Завершаем поток при получении END_STREAM
942
      if (message.isEndOfStream) {
10✔
943
        _logger?.internal(
15✔
944
            'Получен END_STREAM, закрываем поток ответов [streamId: $_streamId]');
12✔
945
        if (!_responseController.isClosed) {
18✔
946
          _responseController.close();
18✔
947
        }
948
      }
949
    } catch (e, stackTrace) {
UNCOV
950
      _logger?.error('Ошибка при обработке ответа [streamId: $_streamId]',
×
951
          error: e, stackTrace: stackTrace);
UNCOV
952
      if (!_responseController.isClosed) {
×
UNCOV
953
        _responseController.addError(e, stackTrace);
×
954
      }
955
    }
956
  }
957

958
  /// Zero-copy: обрабатывает прямой объект ответа без сериализации
959
  void _processDirectResponse(Object directPayload) {
2✔
960
    _logger?.internal(
4✔
961
        'Zero-copy обработка прямого ответа [streamId: $_streamId, type: ${directPayload.runtimeType}]');
6✔
962

963
    try {
964
      final response = directPayload as TResponse;
965
      final rpcMessage = RpcMessage.withPayload<TResponse>(response);
2✔
966

967
      if (!_responseController.isClosed) {
4✔
968
        _responseController.add(rpcMessage);
4✔
969
        _logger?.internal(
4✔
970
            'Zero-copy ответ добавлен в поток ответов [streamId: $_streamId]');
4✔
971
      } else {
UNCOV
972
        _logger?.warning(
×
UNCOV
973
            'Zero-copy: не могу добавить ответ в закрытый контроллер [streamId: $_streamId]');
×
974
      }
975
    } catch (e, stackTrace) {
UNCOV
976
      _logger?.error(
×
UNCOV
977
          'Zero-copy ошибка при обработке прямого ответа [streamId: $_streamId]',
×
978
          error: e,
979
          stackTrace: stackTrace);
UNCOV
980
      if (!_responseController.isClosed) {
×
UNCOV
981
        _responseController.addError(e, stackTrace);
×
982
      }
983
    }
984
  }
985

986
  /// Обрабатывает данные ответа (только для режима сериализации)
987
  void _processResponseData(List<int> messageBytes) {
8✔
988
    if (_isZeroCopy) {
8✔
UNCOV
989
      _logger?.logRpcWarning(
×
990
        message: 'Serialized response received in zero-copy mode, ignoring',
UNCOV
991
        methodPath: _methodPath,
×
UNCOV
992
        streamId: _streamId,
×
993
      );
994
      return;
995
    }
996

997
    _logger?.internal(
12✔
998
        'Получен ответ размером: ${messageBytes.length} байт [streamId: $_streamId]');
12✔
999

1000
    try {
1001
      final uint8Message = messageBytes is Uint8List
8✔
1002
          ? messageBytes
UNCOV
1003
          : Uint8List.fromList(messageBytes);
×
1004

1005
      final messages = _parser!(uint8Message);
16✔
1006
      _logger?.internal(
12✔
1007
          'Парсер извлек ${messages.length} сообщений из фрейма [streamId: $_streamId]');
12✔
1008

1009
      for (var msgBytes in messages) {
16✔
1010
        try {
1011
          _logger?.internal(
12✔
1012
              'Десериализация ответа размером ${msgBytes.length} байт [streamId: $_streamId]');
12✔
1013
          final response = _responseCodec!.deserialize(msgBytes);
16✔
1014

1015
          final rpcMessage = RpcMessage.withPayload<TResponse>(response);
8✔
1016

1017
          if (!_responseController.isClosed) {
16✔
1018
            _responseController.add(rpcMessage);
16✔
1019
            _logger?.internal(
12✔
1020
                'Ответ десериализован и добавлен в поток ответов [streamId: $_streamId]');
8✔
1021
          } else {
UNCOV
1022
            _logger?.warning(
×
UNCOV
1023
                'Не могу добавить ответ в закрытый контроллер [streamId: $_streamId]');
×
1024
          }
1025
        } catch (e, stackTrace) {
UNCOV
1026
          _logger?.error(
×
UNCOV
1027
              'Ошибка при десериализации ответа [streamId: $_streamId]',
×
1028
              error: e,
1029
              stackTrace: stackTrace);
UNCOV
1030
          if (!_responseController.isClosed) {
×
UNCOV
1031
            _responseController.addError(e, stackTrace);
×
1032
          }
1033
        }
1034
      }
1035
    } catch (e, stackTrace) {
UNCOV
1036
      _logger?.error('Ошибка при парсинге ответа [streamId: $_streamId]',
×
1037
          error: e, stackTrace: stackTrace);
UNCOV
1038
      if (!_responseController.isClosed) {
×
UNCOV
1039
        _responseController.addError(e, stackTrace);
×
1040
      }
1041
    }
1042
  }
1043

1044
  /// Отправляет запрос серверу
1045
  Future<void> send(TRequest request) async {
10✔
1046
    if (!_isActive) {
10✔
1047
      _logger?.warning('Попытка отправить запрос в неактивный процессор');
1✔
1048
      return;
1049
    }
1050

1051
    if (!_requestController.isClosed) {
20✔
1052
      _requestController.add(request);
20✔
1053
    } else {
UNCOV
1054
      _logger?.warning('Попытка отправить запрос в закрытый контроллер');
×
1055
    }
1056
  }
1057

1058
  /// Завершает отправку запросов
1059
  Future<void> finishSending() async {
9✔
1060
    if (!_isActive) return;
9✔
1061

1062
    _logger?.internal(
15✔
1063
        'Завершение отправки запросов для $_methodPath [streamId: $_streamId]');
18✔
1064

1065
    if (!_requestController.isClosed) {
18✔
1066
      await _requestController.close();
18✔
1067
    }
1068
  }
1069

1070
  /// Закрывает процессор и освобождает ресурсы
1071
  Future<void> close() async {
10✔
1072
    if (!_isActive) return;
10✔
1073

1074
    _logger?.internal(
16✔
1075
        'Закрытие CallProcessor для $_methodPath [streamId: $_streamId]');
18✔
1076
    _isActive = false;
10✔
1077

1078
    await _requestSubscription?.cancel();
20✔
1079
    _requestSubscription = null;
10✔
1080

1081
    await _responseSubscription?.cancel();
20✔
1082
    _responseSubscription = null;
10✔
1083

1084
    await _cancellationSubscription?.cancel();
16✔
1085
    _cancellationSubscription = null;
10✔
1086

1087
    if (!_requestController.isClosed) {
20✔
1088
      _requestController.close();
20✔
1089
    }
1090

1091
    if (!_responseController.isClosed) {
20✔
1092
      _responseController.close();
20✔
1093
    }
1094
  }
1095
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc