• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nogipx / rpc_dart / 15713458685

17 Jun 2025 04:57PM UTC coverage: 79.658% (-0.7%) from 80.314%
15713458685

push

github

nogipx
implement zero-copy for in-memory

142 of 204 new or added lines in 7 files covered. (69.61%)

15 existing lines in 3 files now uncovered.

2612 of 3279 relevant lines covered (79.66%)

6.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.51
/lib/src/rpc/streams/base_processor.dart
1
// SPDX-FileCopyrightText: 2025 Karim "nogipx" Mamatkazin <nogipx@gmail.com>
2
//
3
// SPDX-License-Identifier: LGPL-3.0-or-later
4

5
part of '_index.dart';
6

7
/// Базовый процессор для обработки стримов без прямой зависимости от транспорта.
8
///
9
/// Принимает поток входящих сообщений от endpoint'а и обрабатывает их,
10
/// предоставляя унифицированный интерфейс для всех типов RPC стримов.
11
///
12
/// Преимущества:
13
/// - Нет race condition с транспортом
14
/// - Переиспользование логики между типами стримов
15
/// - Четкое разделение ответственности
16
final class StreamProcessor<TRequest extends IRpcSerializable,
17
    TResponse extends IRpcSerializable> {
18
  final RpcLogger? _logger;
19
  final IRpcTransport _transport;
20
  final int _streamId;
21
  final String _serviceName;
22
  final String _methodName;
23
  final IRpcCodec<TRequest> _requestCodec;
24
  final IRpcCodec<TResponse> _responseCodec;
25

26
  /// Парсер для обработки фрагментированных сообщений
27
  late final RpcMessageParser _parser;
28

29
  /// Контроллер потока входящих запросов
30
  final StreamController<TRequest> _requestController =
31
      StreamController<TRequest>();
32

33
  /// Контроллер потока исходящих ответов
34
  final StreamController<TResponse> _responseController =
35
      StreamController<TResponse>();
36

37
  /// Подписка на входящий поток сообщений
38
  StreamSubscription? _messageSubscription;
39

40
  /// Флаг активности процессора
41
  bool _isActive = true;
42

43
  /// Флаг отправки начальных метаданных
44
  bool _initialMetadataSent = false;
45

46
  /// Путь метода в формате /ServiceName/MethodName
47
  late final String _methodPath;
48

49
  StreamProcessor({
13✔
50
    required IRpcTransport transport,
51
    required int streamId,
52
    required String serviceName,
53
    required String methodName,
54
    required IRpcCodec<TRequest> requestCodec,
55
    required IRpcCodec<TResponse> responseCodec,
56
    RpcLogger? logger,
57
  })  : _transport = transport,
58
        _streamId = streamId,
59
        _serviceName = serviceName,
60
        _methodName = methodName,
61
        _requestCodec = requestCodec,
62
        _responseCodec = responseCodec,
63
        _logger = logger?.child('StreamProcessor') {
7✔
64
    _parser = RpcMessageParser(logger: _logger);
39✔
65
    _methodPath = '/$_serviceName/$_methodName';
52✔
66

67
    _logger?.internal(
20✔
68
        'Создан StreamProcessor для $_methodPath [streamId: $_streamId]');
21✔
69
    _setupResponseHandler();
13✔
70
  }
71

72
  /// Поток входящих запросов от клиента
73
  Stream<TRequest> get requests => _requestController.stream;
39✔
74

75
  /// Активен ли процессор
76
  bool get isActive => _isActive;
6✔
77

78
  /// Настраивает обработку исходящих ответов
79
  void _setupResponseHandler() {
13✔
80
    _responseController.stream.listen(
39✔
81
      (response) async {
13✔
82
        if (!_isActive) return;
13✔
83

84
        _logger?.internal(
20✔
85
            'Отправка ответа для $_methodPath [streamId: $_streamId]');
21✔
86
        try {
87
          // 🚀 Zero-copy оптимизация для inmemory транспорта
88
          if (_transport is RpcInMemoryTransport) {
26✔
89
            _logger?.internal(
20✔
90
                '🚀 Zero-copy отправка ответа [streamId: $_streamId]');
14✔
91
            await (_transport as RpcInMemoryTransport).sendDirectObject(
26✔
92
              _streamId,
13✔
93
              response as Object,
94
            );
95
            _logger?.internal(
20✔
96
                '🚀 Zero-copy ответ отправлен для $_methodPath [streamId: $_streamId]');
21✔
97
          } else {
98
            // Fallback на стандартную сериализацию для других транспортов
NEW
99
            final serialized = _responseCodec.serialize(response);
×
NEW
100
            _logger?.internal(
×
NEW
101
                'Ответ сериализован, размер: ${serialized.length} байт [streamId: $_streamId]');
×
102

NEW
103
            final framedMessage = RpcMessageFrame.encode(serialized);
×
NEW
104
            await _transport.sendMessage(_streamId, framedMessage);
×
105

NEW
106
            _logger?.internal(
×
NEW
107
                'Ответ отправлен для $_methodPath [streamId: $_streamId]');
×
108
          }
109
        } catch (e, stackTrace) {
110
          // Проверяем, не закрыт ли транспорт
UNCOV
111
          if (e.toString().contains('Transport is closed') ||
×
UNCOV
112
              e.toString().contains('closed')) {
×
113
            _logger?.internal(
×
114
                'Транспорт закрыт, пропускаем отправку ответа [streamId: $_streamId]');
×
115
            return;
116
          }
UNCOV
117
          _logger?.error('Ошибка при отправке ответа [streamId: $_streamId]',
×
118
              error: e, stackTrace: stackTrace);
119
        }
120
      },
121
      onDone: () async {
13✔
122
        if (!_isActive) return;
13✔
123

124
        try {
125
          final trailers = RpcMetadata.forTrailer(RpcStatus.OK);
11✔
126
          await _transport.sendMetadata(_streamId, trailers, endStream: true);
33✔
127
          _logger?.internal(
17✔
128
              'Трейлер отправлен для $_methodPath [streamId: $_streamId]');
18✔
129
        } catch (e, stackTrace) {
130
          // Проверяем, не закрыт ли транспорт
131
          if (e.toString().contains('Transport is closed') ||
×
132
              e.toString().contains('closed')) {
×
133
            _logger?.internal(
×
134
                'Транспорт закрыт, пропускаем отправку трейлера [streamId: $_streamId]');
×
135
            return;
136
          }
137
          _logger?.error('Ошибка при отправке трейлера [streamId: $_streamId]',
×
138
              error: e, stackTrace: stackTrace);
139
        }
140
      },
141
      onError: (error, stackTrace) {
×
142
        _logger?.error(
×
143
            'Ошибка в потоке ответов для $_methodPath [streamId: $_streamId]',
×
144
            error: error,
145
            stackTrace: stackTrace);
146
      },
147
    );
148
  }
149

150
  /// Привязывает процессор к потоку сообщений от endpoint'а
151
  void bindToMessageStream(Stream<RpcTransportMessage> messageStream) {
13✔
152
    if (_messageSubscription != null) {
13✔
153
      _logger?.warning('StreamProcessor уже привязан к потоку сообщений');
1✔
154
      return;
155
    }
156

157
    _logger?.internal(
20✔
158
        'Привязка к потоку сообщений для $_methodPath [streamId: $_streamId]');
21✔
159

160
    _messageSubscription = messageStream.listen(
26✔
161
      _handleMessage,
13✔
162
      onError: (error, stackTrace) {
×
163
        _logger?.error('Ошибка в потоке сообщений',
×
164
            error: error, stackTrace: stackTrace);
165
        if (!_requestController.isClosed) {
×
166
          _requestController.addError(error, stackTrace);
×
167
        }
168
      },
169
      onDone: () {
9✔
170
        _logger?.internal(
16✔
171
            'Поток сообщений завершен для $_methodPath [streamId: $_streamId]');
21✔
172
        if (!_requestController.isClosed) {
18✔
173
          _requestController.close();
18✔
174
        }
175
      },
176
    );
177

178
    // НЕ отправляем начальные метаданные при подключении
179
    // Они будут отправлены при первом успешном ответе
180
    // или пропущены при ошибке (error response отправляется напрямую)
181
  }
182

183
  /// Обрабатывает входящее сообщение
184
  void _handleMessage(RpcTransportMessage message) {
13✔
185
    if (!_isActive) return;
13✔
186

187
    _logger?.internal(
20✔
188
        'Обработка сообщения [streamId: ${message.streamId}, isMetadataOnly: ${message.isMetadataOnly}, hasPayload: ${message.payload != null}, isDirect: ${message.isDirect}, isEndOfStream: ${message.isEndOfStream}]');
42✔
189

190
    // 🚀 Zero-copy: обрабатываем прямой объект
191
    if (message.isDirect && message.directPayload != null) {
22✔
192
      _processDirectMessage(message.directPayload!);
18✔
193
    }
194
    // Обрабатываем сообщения с данными (стандартная сериализация)
195
    else if (!message.isMetadataOnly && message.payload != null) {
12✔
196
      _processDataMessage(message.payload!);
8✔
197
    }
198

199
    // Обрабатываем конец потока
200
    if (message.isEndOfStream) {
13✔
201
      _logger?.internal(
16✔
202
          'Получен END_STREAM, закрываем поток запросов [streamId: $_streamId]');
12✔
203
      if (!_requestController.isClosed) {
20✔
204
        _requestController.close();
20✔
205
      }
206
    }
207
  }
208

209
  /// 🚀 Zero-copy: обрабатывает прямой объект без сериализации
210
  void _processDirectMessage(Object directPayload) {
9✔
211
    _logger?.internal(
14✔
212
        '🚀 Zero-copy обработка прямого объекта [streamId: $_streamId, type: ${directPayload.runtimeType}]');
15✔
213

214
    try {
215
      final request = directPayload as TRequest;
216

217
      if (!_requestController.isClosed) {
18✔
218
        _requestController.add(request);
18✔
219
        _logger?.internal(
14✔
220
            '🚀 Zero-copy запрос добавлен в поток запросов [streamId: $_streamId]');
10✔
221
      } else {
NEW
222
        _logger?.warning(
×
NEW
223
            '🚀 Zero-copy: не могу добавить запрос в закрытый контроллер [streamId: $_streamId]');
×
224
      }
225
    } catch (e, stackTrace) {
NEW
226
      _logger?.error(
×
NEW
227
          '🚀 Zero-copy ошибка при обработке прямого объекта [streamId: $_streamId]',
×
228
          error: e,
229
          stackTrace: stackTrace);
NEW
230
      if (!_requestController.isClosed) {
×
NEW
231
        _requestController.addError(e, stackTrace);
×
232
      }
233
    }
234
  }
235

236
  /// Обрабатывает сообщение с данными
237
  void _processDataMessage(List<int> messageBytes) {
4✔
238
    _logger?.internal(
6✔
239
        'Получено сообщение размером: ${messageBytes.length} байт [streamId: $_streamId]');
6✔
240

241
    try {
242
      // Конвертируем List<int> в Uint8List для парсера
243
      final uint8Message = messageBytes is Uint8List
4✔
244
          ? messageBytes
245
          : Uint8List.fromList(messageBytes);
×
246

247
      final messages = _parser(uint8Message);
8✔
248
      _logger?.internal(
6✔
249
          'Парсер извлек ${messages.length} сообщений из фрейма [streamId: $_streamId]');
6✔
250

251
      for (var msgBytes in messages) {
8✔
252
        try {
253
          _logger?.internal(
6✔
254
              'Десериализация запроса размером ${msgBytes.length} байт [streamId: $_streamId]');
6✔
255
          final request = _requestCodec.deserialize(msgBytes);
8✔
256

257
          if (!_requestController.isClosed) {
8✔
258
            _requestController.add(request);
8✔
259
            _logger?.internal(
6✔
260
                'Запрос десериализован и добавлен в поток запросов [streamId: $_streamId]');
4✔
261
          } else {
262
            _logger?.warning(
×
263
                'Не могу добавить запрос в закрытый контроллер [streamId: $_streamId]');
×
264
          }
265
        } catch (e, stackTrace) {
266
          _logger?.error(
3✔
267
              'Ошибка при десериализации запроса [streamId: $_streamId]',
2✔
268
              error: e,
269
              stackTrace: stackTrace);
270
          if (!_requestController.isClosed) {
4✔
271
            _requestController.addError(e, stackTrace);
4✔
272
          }
273
        }
274
      }
275
    } catch (e, stackTrace) {
276
      _logger?.error('Ошибка при парсинге сообщения [streamId: $_streamId]',
×
277
          error: e, stackTrace: stackTrace);
278
      if (!_requestController.isClosed) {
×
279
        _requestController.addError(e, stackTrace);
×
280
      }
281
    }
282
  }
283

284
  /// Отправляет ответ клиенту
285
  Future<void> send(TResponse response) async {
13✔
286
    if (!_isActive) {
13✔
287
      _logger?.warning('Попытка отправить ответ в неактивный процессор');
2✔
288
      return;
289
    }
290

291
    if (!_responseController.isClosed) {
26✔
292
      _responseController.add(response);
26✔
293
    } else {
294
      _logger?.warning('Попытка отправить ответ в закрытый контроллер');
×
295
    }
296
  }
297

298
  /// Отправляет ошибку клиенту
299
  Future<void> sendError(int statusCode, String message) async {
4✔
300
    if (!_isActive) {
4✔
301
      _logger?.warning('Попытка отправить ошибку в неактивный процессор');
2✔
302
      return;
303
    }
304

305
    _logger?.error(
4✔
306
        'Отправка ошибки клиенту: $statusCode - $message [streamId: $_streamId]');
×
307

308
    if (!_responseController.isClosed) {
8✔
309
      _responseController.close();
8✔
310
    }
311

312
    try {
313
      // Если начальные метаданные не были отправлены, отправляем error response сразу
314
      if (!_initialMetadataSent) {
4✔
315
        _logger?.internal(
4✔
NEW
316
            'Отправка ошибки без начальных метаданных [streamId: $_streamId]');
×
317
        // Создаем комбинированные метаданные: начальный response + error trailer
318
        final errorHeaders = [
4✔
319
          RpcHeader(':status', '200'), // HTTP 200 для gRPC
4✔
320
          RpcHeader(
4✔
321
              RpcConstants.CONTENT_TYPE_HEADER, RpcConstants.GRPC_CONTENT_TYPE),
322
          RpcHeader(RpcConstants.GRPC_STATUS_HEADER, statusCode.toString()),
8✔
323
        ];
324

325
        if (message.isNotEmpty) {
4✔
326
          errorHeaders
327
              .add(RpcHeader(RpcConstants.GRPC_MESSAGE_HEADER, message));
8✔
328
        }
329

330
        final errorMetadata = RpcMetadata(errorHeaders);
4✔
331
        await _transport.sendMetadata(_streamId, errorMetadata,
12✔
332
            endStream: true);
333
        _initialMetadataSent = true;
4✔
334
      } else {
335
        // Начальные метаданные уже отправлены, отправляем только trailer
NEW
336
        final trailers = RpcMetadata.forTrailer(statusCode, message: message);
×
NEW
337
        await _transport.sendMetadata(_streamId, trailers, endStream: true);
×
338
      }
339

340
      _logger?.internal('Ошибка отправлена клиенту [streamId: $_streamId]');
4✔
341
    } catch (e, stackTrace) {
342
      // Проверяем, не закрыт ли транспорт
343
      if (e.toString().contains('Transport is closed') ||
×
344
          e.toString().contains('closed')) {
×
345
        _logger?.internal(
×
NEW
346
            'Транспорт закрыт, пропускаем отправку ошибки [streamId: $_streamId]');
×
347
        return;
348
      }
349
      _logger?.error(
×
NEW
350
          'Ошибка при отправке ошибки клиенту [streamId: $_streamId]',
×
351
          error: e,
352
          stackTrace: stackTrace);
353
    }
354
  }
355

356
  /// Завершает отправку ответов
357
  Future<void> finishSending() async {
11✔
358
    if (!_isActive) return;
11✔
359

360
    _logger?.internal(
17✔
361
        'Завершение отправки ответов для $_methodPath [streamId: $_streamId]');
18✔
362

363
    if (!_responseController.isClosed) {
22✔
364
      await _responseController.close();
20✔
365
    }
366
  }
367

368
  /// Закрывает процессор и освобождает ресурсы
369
  Future<void> close() async {
7✔
370
    if (!_isActive) return;
7✔
371

372
    _logger?.internal(
8✔
373
        'Закрытие StreamProcessor для $_methodPath [streamId: $_streamId]');
3✔
374
    _isActive = false;
7✔
375

376
    await _messageSubscription?.cancel();
14✔
377
    _messageSubscription = null;
7✔
378

379
    if (!_requestController.isClosed) {
14✔
380
      _requestController.close();
12✔
381
    }
382

383
    if (!_responseController.isClosed) {
14✔
384
      _responseController.close();
12✔
385
    }
386
  }
387
}
388

389
/// Базовый процессор для клиентских вызовов RPC стримов.
390
///
391
/// Предоставляет единую основу для всех типов клиентских стримов,
392
/// избегая дублирования логики и inner dependencies.
393
///
394
/// Преимущества:
395
/// - Переиспользование кода между типами стримов
396
/// - Отсутствие race condition
397
/// - Четкое разделение ответственности
398
/// - Тестируемость без внепроцессных зависимостей
399
final class CallProcessor<TRequest extends IRpcSerializable,
400
    TResponse extends IRpcSerializable> {
401
  final RpcLogger? _logger;
402
  final IRpcTransport _transport;
403
  final int _streamId;
404
  final String _serviceName;
405
  final String _methodName;
406
  final IRpcCodec<TRequest> _requestCodec;
407
  final IRpcCodec<TResponse> _responseCodec;
408

409
  /// RPC контекст для передачи метаданных, таймаутов и отмены
410
  final RpcContext? _context;
411

412
  /// Парсер для обработки фрагментированных сообщений
413
  late final RpcMessageParser _parser;
414

415
  /// Контроллер потока исходящих запросов
416
  final StreamController<TRequest> _requestController =
417
      StreamController<TRequest>();
418

419
  /// Контроллер потока входящих ответов
420
  final StreamController<RpcMessage<TResponse>> _responseController =
421
      StreamController<RpcMessage<TResponse>>();
422

423
  /// Подписка на исходящие запросы
424
  StreamSubscription? _requestSubscription;
425

426
  /// Подписка на входящие ответы
427
  StreamSubscription? _responseSubscription;
428

429
  /// Флаг активности процессора
430
  bool _isActive = true;
431

432
  /// Флаг отправки начальных метаданных
433
  bool _initialMetadataSent = false;
434

435
  /// Путь метода в формате /ServiceName/MethodName
436
  late final String _methodPath;
437

438
  CallProcessor({
13✔
439
    required IRpcTransport transport,
440
    required String serviceName,
441
    required String methodName,
442
    required IRpcCodec<TRequest> requestCodec,
443
    required IRpcCodec<TResponse> responseCodec,
444
    RpcContext? context,
445
    RpcLogger? logger,
446
  })  : _transport = transport,
447
        _streamId = transport.createStream(),
13✔
448
        _serviceName = serviceName,
449
        _methodName = methodName,
450
        _requestCodec = requestCodec,
451
        _responseCodec = responseCodec,
452
        _context = context,
453
        _logger = logger?.child('CallProcessor') {
7✔
454
    _parser = RpcMessageParser(logger: _logger);
39✔
455
    _methodPath = '/$_serviceName/$_methodName';
52✔
456

457
    _logger?.internal(
20✔
458
        'Создан CallProcessor для $_methodPath [streamId: $_streamId]');
21✔
459

460
    // Проверяем контекст перед началом работы
461
    _checkContextBeforeCall();
13✔
462

463
    _setupRequestHandler();
13✔
464
    _setupResponseHandler();
13✔
465
  }
466

467
  /// Поток входящих ответов от сервера
468
  Stream<RpcMessage<TResponse>> get responses => _responseController.stream;
39✔
469

470
  /// Активен ли процессор
471
  bool get isActive => _isActive;
6✔
472

473
  /// ID стрима
474
  int get streamId => _streamId;
6✔
475

476
  /// Настраивает обработку исходящих запросов
477
  void _setupRequestHandler() {
13✔
478
    _requestSubscription = _requestController.stream.listen(
52✔
479
      (request) async {
13✔
480
        if (!_isActive) return;
13✔
481

482
        try {
483
          // Отправляем начальные метаданные при первом запросе
484
          if (!_initialMetadataSent) {
13✔
485
            await _sendInitialMetadata();
13✔
486
            _initialMetadataSent = true;
13✔
487
          }
488

489
          _logger?.internal(
20✔
490
              'Отправка запроса для $_methodPath [streamId: $_streamId]');
21✔
491

492
          // 🚀 Zero-copy оптимизация для inmemory транспорта
493
          if (_transport is RpcInMemoryTransport) {
26✔
494
            _logger?.internal(
18✔
495
                '🚀 Zero-copy отправка запроса [streamId: $_streamId]');
12✔
496
            await (_transport as RpcInMemoryTransport).sendDirectObject(
24✔
497
              _streamId,
12✔
498
              request as Object,
499
            );
500
            _logger?.internal(
18✔
501
                '🚀 Zero-copy запрос отправлен для $_methodPath [streamId: $_streamId]');
18✔
502
          } else {
503
            // Fallback на стандартную сериализацию для других транспортов
504
            final serialized = _requestCodec.serialize(request);
4✔
505
            _logger?.internal(
2✔
506
                'Запрос сериализован, размер: ${serialized.length} байт [streamId: $_streamId]');
3✔
507

508
            final framedMessage = RpcMessageFrame.encode(serialized);
1✔
509
            await _transport.sendMessage(_streamId, framedMessage);
3✔
510

511
            _logger?.internal(
2✔
512
                'Запрос отправлен для $_methodPath [streamId: $_streamId]');
3✔
513
          }
514
        } catch (e, stackTrace) {
515
          _logger?.error('Ошибка при отправке запроса [streamId: $_streamId]',
5✔
516
              error: e, stackTrace: stackTrace);
517
          if (!_responseController.isClosed) {
4✔
518
            _responseController.addError(e, stackTrace);
4✔
519
          }
520

521
          // 🔥 КРИТИЧЕСКОЕ ИСПРАВЛЕНИЕ: При ошибке роутинга немедленно завершаем обработку
522
          // Это предотвратит дальнейшую отправку запросов и заставит stream завершиться с ошибкой
523
          if (!_requestController.isClosed) {
4✔
524
            _requestController.close();
2✔
525
          }
526
        }
527
      },
528
      onDone: () async {
11✔
529
        if (!_isActive) return;
11✔
530

531
        try {
532
          await _transport.finishSending(_streamId);
33✔
533
          _logger?.internal(
17✔
534
              'finishSending выполнен для $_methodPath [streamId: $_streamId]');
18✔
535
        } catch (e, stackTrace) {
536
          _logger?.error(
×
537
              'Ошибка при завершении отправки запросов [streamId: $_streamId]',
×
538
              error: e,
539
              stackTrace: stackTrace);
540
        }
541
      },
542
      onError: (error, stackTrace) {
×
543
        _logger?.error(
×
544
            'Ошибка в потоке запросов для $_methodPath [streamId: $_streamId]',
×
545
            error: error,
546
            stackTrace: stackTrace);
547
        if (!_responseController.isClosed) {
×
548
          _responseController.addError(error, stackTrace);
×
549
        }
550
      },
551
    );
552
  }
553

554
  /// Настраивает обработку входящих ответов
555
  void _setupResponseHandler() {
13✔
556
    _responseSubscription = _transport.getMessagesForStream(_streamId).listen(
65✔
557
      _handleResponse,
13✔
558
      onError: (error, stackTrace) {
×
559
        _logger?.error('Ошибка в потоке ответов',
×
560
            error: error, stackTrace: stackTrace);
561
        if (!_responseController.isClosed) {
×
562
          _responseController.addError(error, stackTrace);
×
563
        }
564
      },
565
      onDone: () {
8✔
566
        _logger?.internal(
15✔
567
            'Поток ответов завершен для $_methodPath [streamId: $_streamId]');
21✔
568
        if (!_responseController.isClosed) {
16✔
569
          _responseController.close();
6✔
570
        }
571
      },
572
    );
573
  }
574

575
  /// Отправляет начальные метаданные с поддержкой контекста
576
  Future<void> _sendInitialMetadata() async {
13✔
577
    _logger?.internal(
20✔
578
        'Отправка начальных метаданных для $_methodPath [streamId: $_streamId]');
21✔
579

580
    final baseMetadata =
581
        RpcMetadata.forClientRequest(_serviceName, _methodName);
39✔
582

583
    // Создаем новые метаданные с заголовками из контекста
584
    final headers = List<RpcHeader>.from(baseMetadata.headers);
26✔
585

586
    if (_context != null) {
13✔
587
      // Добавляем пользовательские заголовки из контекста
588
      for (final entry in _context!.headers.entries) {
24✔
589
        headers.add(RpcHeader(entry.key, entry.value));
24✔
590
      }
591

592
      // Добавляем специальные заголовки RPC
593
      if (_context!.traceId != null) {
12✔
594
        headers.add(RpcHeader('x-trace-id', _context!.traceId!));
24✔
595
      }
596
      headers.add(RpcHeader('x-request-id', _context!.requestId));
24✔
597

598
      // Context values остаются ЛОКАЛЬНЫМИ - не передаются через сеть (соответствует стандарту gRPC)
599
      // Только headers передаются через HTTP/2 заголовки
600

601
      _logger?.internal(
12✔
602
          'Добавлены заголовки контекста: ${_context!.headers.length} пользовательских + системные [streamId: $_streamId]');
30✔
603
    } else {
604
      // Даже для null контекста добавляем базовый request-id
605
      final requestId =
606
          RpcContext.empty().requestId; // Генерируем базовый request-id
14✔
607
      headers.add(RpcHeader('x-request-id', requestId));
14✔
608

609
      _logger?.internal(
8✔
610
          'Добавлен базовый request-id для null контекста [streamId: $_streamId]');
2✔
611
    }
612

613
    final metadata = RpcMetadata(headers);
13✔
614
    await _transport.sendMetadata(_streamId, metadata);
39✔
615

616
    _logger?.internal(
20✔
617
        'Начальные метаданные отправлены для $_methodPath [streamId: $_streamId]');
21✔
618
  }
619

620
  /// Проверяет состояние контекста перед вызовом
621
  void _checkContextBeforeCall() {
13✔
622
    if (_context == null) return;
13✔
623

624
    // Проверяем отмену
625
    _context!.cancellationToken?.throwIfCancelled();
17✔
626

627
    // Проверяем deadline
628
    if (_context!.isExpired) {
16✔
629
      throw RpcDeadlineExceededException(
1✔
630
        _context!.deadline!,
2✔
631
        Duration.zero,
632
      );
633
    }
634

635
    _logger?.internal(
15✔
636
        'Контекст проверен: requestId=${_context!.requestId}, traceId=${_context!.traceId} [streamId: $_streamId]');
42✔
637
  }
638

639
  /// Обрабатывает входящий ответ
640
  void _handleResponse(RpcTransportMessage message) {
13✔
641
    if (!_isActive) return;
13✔
642

643
    _logger?.internal(
20✔
644
        'Обработка ответа [streamId: ${message.streamId}, isMetadataOnly: ${message.isMetadataOnly}, hasPayload: ${message.payload != null}, isDirect: ${message.isDirect}]');
35✔
645

646
    try {
647
      // Обрабатываем метаданные
648
      if (message.isMetadataOnly) {
13✔
649
        final rpcMessage = RpcMessage.withMetadata<TResponse>(
11✔
650
          message.metadata!,
11✔
651
          isEndOfStream: message.isEndOfStream,
11✔
652
        );
653

654
        if (!_responseController.isClosed) {
22✔
655
          _responseController.add(rpcMessage);
22✔
656
          _logger?.internal(
17✔
657
              'Метаданные добавлены в поток ответов [streamId: $_streamId]');
12✔
658
        }
659
      }
660

661
      // 🚀 Zero-copy: обрабатываем прямой объект
662
      if (message.isDirect && message.directPayload != null) {
23✔
663
        _processDirectResponse(message.directPayload!);
20✔
664
      }
665
      // Обрабатываем сообщения с данными (стандартная сериализация)
666
      else if (!message.isMetadataOnly && message.payload != null) {
15✔
667
        _processResponseData(message.payload!);
6✔
668
      }
669

670
      // Завершаем поток при получении END_STREAM
671
      if (message.isEndOfStream) {
13✔
672
        _logger?.internal(
17✔
673
            'Получен END_STREAM, закрываем поток ответов [streamId: $_streamId]');
12✔
674
        if (!_responseController.isClosed) {
22✔
675
          _responseController.close();
22✔
676
        }
677
      }
678
    } catch (e, stackTrace) {
679
      _logger?.error('Ошибка при обработке ответа [streamId: $_streamId]',
×
680
          error: e, stackTrace: stackTrace);
681
      if (!_responseController.isClosed) {
×
682
        _responseController.addError(e, stackTrace);
×
683
      }
684
    }
685
  }
686

687
  /// 🚀 Zero-copy: обрабатывает прямой объект ответа без сериализации
688
  void _processDirectResponse(Object directPayload) {
10✔
689
    _logger?.internal(
16✔
690
        '🚀 Zero-copy обработка прямого ответа [streamId: $_streamId, type: ${directPayload.runtimeType}]');
18✔
691

692
    try {
693
      final response = directPayload as TResponse;
694
      final rpcMessage = RpcMessage.withPayload<TResponse>(response);
10✔
695

696
      if (!_responseController.isClosed) {
20✔
697
        _responseController.add(rpcMessage);
20✔
698
        _logger?.internal(
16✔
699
            '🚀 Zero-copy ответ добавлен в поток ответов [streamId: $_streamId]');
12✔
700
      } else {
NEW
701
        _logger?.warning(
×
NEW
702
            '🚀 Zero-copy: не могу добавить ответ в закрытый контроллер [streamId: $_streamId]');
×
703
      }
704
    } catch (e, stackTrace) {
NEW
705
      _logger?.error(
×
NEW
706
          '🚀 Zero-copy ошибка при обработке прямого ответа [streamId: $_streamId]',
×
707
          error: e,
708
          stackTrace: stackTrace);
NEW
709
      if (!_responseController.isClosed) {
×
NEW
710
        _responseController.addError(e, stackTrace);
×
711
      }
712
    }
713
  }
714

715
  /// Обрабатывает данные ответа
716
  void _processResponseData(List<int> messageBytes) {
3✔
717
    _logger?.internal(
4✔
718
        'Получен ответ размером: ${messageBytes.length} байт [streamId: $_streamId]');
3✔
719

720
    try {
721
      final uint8Message = messageBytes is Uint8List
3✔
722
          ? messageBytes
723
          : Uint8List.fromList(messageBytes);
×
724

725
      final messages = _parser(uint8Message);
6✔
726
      _logger?.internal(
4✔
727
          'Парсер извлек ${messages.length} сообщений из фрейма [streamId: $_streamId]');
3✔
728

729
      for (var msgBytes in messages) {
6✔
730
        try {
731
          _logger?.internal(
4✔
732
              'Десериализация ответа размером ${msgBytes.length} байт [streamId: $_streamId]');
3✔
733
          final response = _responseCodec.deserialize(msgBytes);
6✔
734

735
          final rpcMessage = RpcMessage.withPayload<TResponse>(response);
2✔
736

737
          if (!_responseController.isClosed) {
4✔
738
            _responseController.add(rpcMessage);
4✔
739
            _logger?.internal(
3✔
740
                'Ответ десериализован и добавлен в поток ответов [streamId: $_streamId]');
2✔
741
          } else {
742
            _logger?.warning(
×
743
                'Не могу добавить ответ в закрытый контроллер [streamId: $_streamId]');
×
744
          }
745
        } catch (e, stackTrace) {
746
          _logger?.error(
1✔
747
              'Ошибка при десериализации ответа [streamId: $_streamId]',
×
748
              error: e,
749
              stackTrace: stackTrace);
750
          if (!_responseController.isClosed) {
2✔
751
            _responseController.addError(e, stackTrace);
2✔
752
          }
753
        }
754
      }
755
    } catch (e, stackTrace) {
756
      _logger?.error('Ошибка при парсинге ответа [streamId: $_streamId]',
×
757
          error: e, stackTrace: stackTrace);
758
      if (!_responseController.isClosed) {
×
759
        _responseController.addError(e, stackTrace);
×
760
      }
761
    }
762
  }
763

764
  /// Отправляет запрос серверу
765
  Future<void> send(TRequest request) async {
13✔
766
    if (!_isActive) {
13✔
767
      _logger?.warning('Попытка отправить запрос в неактивный процессор');
1✔
768
      return;
769
    }
770

771
    if (!_requestController.isClosed) {
26✔
772
      _requestController.add(request);
26✔
773
    } else {
774
      _logger?.warning('Попытка отправить запрос в закрытый контроллер');
×
775
    }
776
  }
777

778
  /// Завершает отправку запросов
779
  Future<void> finishSending() async {
11✔
780
    if (!_isActive) return;
11✔
781

782
    _logger?.internal(
17✔
783
        'Завершение отправки запросов для $_methodPath [streamId: $_streamId]');
18✔
784

785
    if (!_requestController.isClosed) {
22✔
786
      await _requestController.close();
22✔
787
    }
788
  }
789

790
  /// Закрывает процессор и освобождает ресурсы
791
  Future<void> close() async {
13✔
792
    if (!_isActive) return;
13✔
793

794
    _logger?.internal(
20✔
795
        'Закрытие CallProcessor для $_methodPath [streamId: $_streamId]');
21✔
796
    _isActive = false;
13✔
797

798
    await _requestSubscription?.cancel();
26✔
799
    _requestSubscription = null;
13✔
800

801
    await _responseSubscription?.cancel();
26✔
802
    _responseSubscription = null;
13✔
803

804
    if (!_requestController.isClosed) {
26✔
805
      _requestController.close();
24✔
806
    }
807

808
    if (!_responseController.isClosed) {
26✔
809
      _responseController.close();
24✔
810
    }
811
  }
812
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc