• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19999

25 Sep 2025 12:43PM UTC coverage: 88.57% (-0.02%) from 88.59%
#19999

push

github

web-flow
Observe failOnWarnings for android build (#12040)

Fixes: #6868

34799 of 39290 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.77
/../core/src/main/java/io/grpc/internal/ClientCallImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
24
import static io.grpc.Contexts.statusFromCancelled;
25
import static io.grpc.Status.DEADLINE_EXCEEDED;
26
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
27
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
28
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
29
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
30
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
31

32
import com.google.common.annotations.VisibleForTesting;
33
import com.google.common.base.MoreObjects;
34
import io.grpc.Attributes;
35
import io.grpc.CallOptions;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.Codec;
39
import io.grpc.Compressor;
40
import io.grpc.CompressorRegistry;
41
import io.grpc.Context;
42
import io.grpc.Context.CancellationListener;
43
import io.grpc.Deadline;
44
import io.grpc.DecompressorRegistry;
45
import io.grpc.InternalConfigSelector;
46
import io.grpc.InternalDecompressorRegistry;
47
import io.grpc.Metadata;
48
import io.grpc.MethodDescriptor;
49
import io.grpc.MethodDescriptor.MethodType;
50
import io.grpc.Status;
51
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
52
import io.perfmark.Link;
53
import io.perfmark.PerfMark;
54
import io.perfmark.Tag;
55
import io.perfmark.TaskCloseable;
56
import java.io.InputStream;
57
import java.nio.charset.Charset;
58
import java.util.Locale;
59
import java.util.concurrent.CancellationException;
60
import java.util.concurrent.Executor;
61
import java.util.concurrent.ScheduledExecutorService;
62
import java.util.concurrent.ScheduledFuture;
63
import java.util.concurrent.TimeUnit;
64
import java.util.concurrent.TimeoutException;
65
import java.util.logging.Level;
66
import java.util.logging.Logger;
67
import javax.annotation.Nullable;
68

69
/**
70
 * Implementation of {@link ClientCall}.
71
 */
72
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
73

74
  private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
1✔
75
  private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
1✔
76
      = "gzip".getBytes(Charset.forName("US-ASCII"));
1✔
77
  private static final double NANO_TO_SECS = 1.0 * TimeUnit.SECONDS.toNanos(1);
1✔
78

79
  private final MethodDescriptor<ReqT, RespT> method;
80
  private final Tag tag;
81
  private final Executor callExecutor;
82
  private final boolean callExecutorIsDirect;
83
  private final CallTracer channelCallsTracer;
84
  private final Context context;
85
  private CancellationHandler cancellationHandler;
86
  private final boolean unaryRequest;
87
  private CallOptions callOptions;
88
  private ClientStream stream;
89
  private boolean cancelCalled;
90
  private boolean halfCloseCalled;
91
  private final ClientStreamProvider clientStreamProvider;
92
  private final ScheduledExecutorService deadlineCancellationExecutor;
93
  private boolean fullStreamDecompression;
94
  private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
95
  private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
1✔
96

97
  ClientCallImpl(
98
      MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
99
      ClientStreamProvider clientStreamProvider,
100
      ScheduledExecutorService deadlineCancellationExecutor,
101
      CallTracer channelCallsTracer,
102
      // TODO(zdapeng): remove this arg
103
      @Nullable InternalConfigSelector configSelector) {
1✔
104
    this.method = method;
1✔
105
    // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
106
    this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
1✔
107
    // If we know that the executor is a direct executor, we don't need to wrap it with a
108
    // SerializingExecutor. This is purely for performance reasons.
109
    // See https://github.com/grpc/grpc-java/issues/368
110
    if (executor == directExecutor()) {
1✔
111
      this.callExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
112
      callExecutorIsDirect = true;
1✔
113
    } else {
114
      this.callExecutor = new SerializingExecutor(executor);
1✔
115
      callExecutorIsDirect = false;
1✔
116
    }
117
    this.channelCallsTracer = channelCallsTracer;
1✔
118
    // Propagate the context from the thread which initiated the call to all callbacks.
119
    this.context = Context.current();
1✔
120
    this.unaryRequest = method.getType() == MethodType.UNARY
1✔
121
        || method.getType() == MethodType.SERVER_STREAMING;
1✔
122
    this.callOptions = callOptions;
1✔
123
    this.clientStreamProvider = clientStreamProvider;
1✔
124
    this.deadlineCancellationExecutor = deadlineCancellationExecutor;
1✔
125
    PerfMark.event("ClientCall.<init>", tag);
1✔
126
  }
1✔
127

128
  /**
129
   * Provider of {@link ClientStream}s.
130
   */
131
  interface ClientStreamProvider {
132
    ClientStream newStream(
133
        MethodDescriptor<?, ?> method,
134
        CallOptions callOptions,
135
        Metadata headers,
136
        Context context);
137
  }
138

139
  ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
140
    this.fullStreamDecompression = fullStreamDecompression;
1✔
141
    return this;
1✔
142
  }
143

144
  ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
145
    this.decompressorRegistry = decompressorRegistry;
1✔
146
    return this;
1✔
147
  }
148

149
  ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
150
    this.compressorRegistry = compressorRegistry;
1✔
151
    return this;
1✔
152
  }
153

154
  @VisibleForTesting
155
  static void prepareHeaders(
156
      Metadata headers,
157
      DecompressorRegistry decompressorRegistry,
158
      Compressor compressor,
159
      boolean fullStreamDecompression) {
160
    headers.discardAll(CONTENT_LENGTH_KEY);
1✔
161
    headers.discardAll(MESSAGE_ENCODING_KEY);
1✔
162
    if (compressor != Codec.Identity.NONE) {
1✔
163
      headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
1✔
164
    }
165

166
    headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
167
    byte[] advertisedEncodings =
1✔
168
        InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
1✔
169
    if (advertisedEncodings.length != 0) {
1✔
170
      headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
1✔
171
    }
172

173
    headers.discardAll(CONTENT_ENCODING_KEY);
1✔
174
    headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
1✔
175
    if (fullStreamDecompression) {
1✔
176
      headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
1✔
177
    }
178
  }
1✔
179

180
  @Override
181
  public void start(Listener<RespT> observer, Metadata headers) {
182
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) {
1✔
183
      PerfMark.attachTag(tag);
1✔
184
      startInternal(observer, headers);
1✔
185
    }
186
  }
1✔
187

188
  private void startInternal(Listener<RespT> observer, Metadata headers) {
189
    checkState(stream == null, "Already started");
1✔
190
    checkState(!cancelCalled, "call was cancelled");
1✔
191
    checkNotNull(observer, "observer");
1✔
192
    checkNotNull(headers, "headers");
1✔
193

194
    if (context.isCancelled()) {
1✔
195
      // Context is already cancelled so no need to create a real stream, just notify the observer
196
      // of cancellation via callback on the executor
197
      stream = NoopClientStream.INSTANCE;
1✔
198
      final Listener<RespT> finalObserver = observer;
1✔
199
      class ClosedByContext extends ContextRunnable {
200
        ClosedByContext() {
1✔
201
          super(context);
1✔
202
        }
1✔
203

204
        @Override
205
        public void runInContext() {
206
          closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
1✔
207
        }
1✔
208
      }
209

210
      callExecutor.execute(new ClosedByContext());
1✔
211
      return;
1✔
212
    }
213
    applyMethodConfig();
1✔
214
    final String compressorName = callOptions.getCompressor();
1✔
215
    Compressor compressor;
216
    if (compressorName != null) {
1✔
217
      compressor = compressorRegistry.lookupCompressor(compressorName);
1✔
218
      if (compressor == null) {
1✔
219
        stream = NoopClientStream.INSTANCE;
×
220
        final Listener<RespT> finalObserver = observer;
×
221
        class ClosedByNotFoundCompressor extends ContextRunnable {
222
          ClosedByNotFoundCompressor() {
×
223
            super(context);
×
224
          }
×
225

226
          @Override
227
          public void runInContext() {
228
            closeObserver(
×
229
                finalObserver,
230
                Status.INTERNAL.withDescription(
×
231
                    String.format("Unable to find compressor by name %s", compressorName)),
×
232
                new Metadata());
233
          }
×
234
        }
235

236
        callExecutor.execute(new ClosedByNotFoundCompressor());
×
237
        return;
×
238
      }
239
    } else {
240
      compressor = Codec.Identity.NONE;
1✔
241
    }
242
    prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
1✔
243

244
    Deadline effectiveDeadline = effectiveDeadline();
1✔
245
    boolean contextIsDeadlineSource = effectiveDeadline != null
1✔
246
        && effectiveDeadline.equals(context.getDeadline());
1✔
247
    cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
1✔
248
    boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
1✔
249
    if (!deadlineExceeded) {
1✔
250
      stream = clientStreamProvider.newStream(method, callOptions, headers, context);
1✔
251
    } else {
252
      ClientStreamTracer[] tracers =
1✔
253
          GrpcUtil.getClientStreamTracers(callOptions, headers, 0,
1✔
254
              false, false);
255
      String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
1✔
256
      Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
257
      String description = String.format(
1✔
258
          "ClientCall started after %s deadline was exceeded %.9f seconds ago. "
259
              + "Name resolution delay %.9f seconds.", deadlineName,
260
          cancellationHandler.remainingNanos / NANO_TO_SECS,
1✔
261
          nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
1✔
262
      stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
1✔
263
    }
264

265
    if (callExecutorIsDirect) {
1✔
266
      stream.optimizeForDirectExecutor();
1✔
267
    }
268
    if (callOptions.getAuthority() != null) {
1✔
269
      stream.setAuthority(callOptions.getAuthority());
1✔
270
    }
271
    if (callOptions.getMaxInboundMessageSize() != null) {
1✔
272
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
1✔
273
    }
274
    if (callOptions.getMaxOutboundMessageSize() != null) {
1✔
275
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
1✔
276
    }
277
    if (effectiveDeadline != null) {
1✔
278
      stream.setDeadline(effectiveDeadline);
1✔
279
    }
280
    stream.setCompressor(compressor);
1✔
281
    if (fullStreamDecompression) {
1✔
282
      stream.setFullStreamDecompression(fullStreamDecompression);
×
283
    }
284
    stream.setDecompressorRegistry(decompressorRegistry);
1✔
285
    channelCallsTracer.reportCallStarted();
1✔
286
    stream.start(new ClientStreamListenerImpl(observer));
1✔
287

288
    // Delay any sources of cancellation after start(), because most of the transports are broken if
289
    // they receive cancel before start. Issue #1343 has more details
290

291
    // Propagate later Context cancellation to the remote side.
292
    cancellationHandler.setUp();
1✔
293
  }
1✔
294

295
  private void applyMethodConfig() {
296
    MethodInfo info = callOptions.getOption(MethodInfo.KEY);
1✔
297
    if (info == null) {
1✔
298
      return;
1✔
299
    }
300
    if (info.timeoutNanos != null) {
1✔
301
      Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
302
      Deadline existingDeadline = callOptions.getDeadline();
1✔
303
      // If the new deadline is sooner than the existing deadline, swap them.
304
      if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) {
1✔
305
        callOptions = callOptions.withDeadline(newDeadline);
1✔
306
      }
307
    }
308
    if (info.waitForReady != null) {
1✔
309
      callOptions =
1✔
310
          info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady();
1✔
311
    }
312
    if (info.maxInboundMessageSize != null) {
1✔
313
      Integer existingLimit = callOptions.getMaxInboundMessageSize();
×
314
      if (existingLimit != null) {
×
315
        callOptions =
×
316
            callOptions.withMaxInboundMessageSize(
×
317
                Math.min(existingLimit, info.maxInboundMessageSize));
×
318
      } else {
319
        callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize);
×
320
      }
321
    }
322
    if (info.maxOutboundMessageSize != null) {
1✔
323
      Integer existingLimit = callOptions.getMaxOutboundMessageSize();
×
324
      if (existingLimit != null) {
×
325
        callOptions =
×
326
            callOptions.withMaxOutboundMessageSize(
×
327
                Math.min(existingLimit, info.maxOutboundMessageSize));
×
328
      } else {
329
        callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
×
330
      }
331
    }
332
  }
1✔
333

334
  private final class CancellationHandler implements Runnable, CancellationListener {
335
    private final boolean contextIsDeadlineSource;
336
    private final boolean hasDeadline;
337
    private final long remainingNanos;
338
    private volatile ScheduledFuture<?> deadlineCancellationFuture;
339
    private volatile boolean tearDownCalled;
340

341
    CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
1✔
342
      this.contextIsDeadlineSource = contextIsDeadlineSource;
1✔
343
      if (deadline == null) {
1✔
344
        hasDeadline = false;
1✔
345
        remainingNanos = 0;
1✔
346
      } else {
347
        hasDeadline = true;
1✔
348
        remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
349
      }
350
    }
1✔
351

352
    void setUp() {
353
      if (tearDownCalled) {
1✔
354
        return;
1✔
355
      }
356
      if (hasDeadline
1✔
357
          // If the context has the effective deadline, we don't need to schedule an extra task.
358
          && !contextIsDeadlineSource
359
          // If the channel has been terminated, we don't need to schedule an extra task.
360
          && deadlineCancellationExecutor != null) {
1✔
361
        deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
1✔
362
            new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
363
      }
364
      context.addListener(this, directExecutor());
1✔
365
      if (tearDownCalled) {
1✔
366
        // Race detected! Re-run to make sure the future is cancelled and context listener removed
367
        tearDown();
×
368
      }
369
    }
1✔
370

371
    // May be called multiple times, and race with setUp()
372
    void tearDown() {
373
      tearDownCalled = true;
1✔
374
      ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
1✔
375
      if (deadlineCancellationFuture != null) {
1✔
376
        deadlineCancellationFuture.cancel(false);
1✔
377
      }
378
      context.removeListener(this);
1✔
379
    }
1✔
380

381
    @Override
382
    public void cancelled(Context context) {
383
      if (hasDeadline && contextIsDeadlineSource
1✔
384
          && context.cancellationCause() instanceof TimeoutException) {
1✔
385
        stream.cancel(formatDeadlineExceededStatus());
1✔
386
        return;
1✔
387
      }
388
      stream.cancel(statusFromCancelled(context));
1✔
389
    }
1✔
390

391
    @Override
392
    public void run() {
393
      stream.cancel(formatDeadlineExceededStatus());
1✔
394
    }
1✔
395

396
    Status formatDeadlineExceededStatus() {
397
      // DelayedStream.cancel() is safe to call from a thread that is different from where the
398
      // stream is created.
399
      long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
400
      long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
401

402
      StringBuilder buf = new StringBuilder();
1✔
403
      buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
1✔
404
      buf.append(" deadline exceeded after ");
1✔
405
      if (remainingNanos < 0) {
1✔
406
        buf.append('-');
1✔
407
      }
408
      buf.append(seconds);
1✔
409
      buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
410
      buf.append("s. ");
1✔
411
      Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
412
      buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
1✔
413
          nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
1✔
414
      if (stream != null) {
1✔
415
        InsightBuilder insight = new InsightBuilder();
1✔
416
        stream.appendTimeoutInsight(insight);
1✔
417
        buf.append(" ");
1✔
418
        buf.append(insight);
1✔
419
      }
420
      return DEADLINE_EXCEEDED.withDescription(buf.toString());
1✔
421
    }
422
  }
423

424
  @Nullable
425
  private Deadline effectiveDeadline() {
426
    // Call options and context are immutable, so we don't need to cache the deadline.
427
    return min(callOptions.getDeadline(), context.getDeadline());
1✔
428
  }
429

430
  @Nullable
431
  private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
432
    if (deadline0 == null) {
1✔
433
      return deadline1;
1✔
434
    }
435
    if (deadline1 == null) {
1✔
436
      return deadline0;
1✔
437
    }
438
    return deadline0.minimum(deadline1);
1✔
439
  }
440

441
  @Override
442
  public void request(int numMessages) {
443
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
1✔
444
      PerfMark.attachTag(tag);
1✔
445
      checkState(stream != null, "Not started");
1✔
446
      checkArgument(numMessages >= 0, "Number requested must be non-negative");
1✔
447
      stream.request(numMessages);
1✔
448
    }
449
  }
1✔
450

451
  @Override
452
  public void cancel(@Nullable String message, @Nullable Throwable cause) {
453
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) {
1✔
454
      PerfMark.attachTag(tag);
1✔
455
      cancelInternal(message, cause);
1✔
456
    }
457
  }
1✔
458

459
  private void cancelInternal(@Nullable String message, @Nullable Throwable cause) {
460
    if (message == null && cause == null) {
1✔
461
      cause = new CancellationException("Cancelled without a message or cause");
×
462
      log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
×
463
    }
464
    if (cancelCalled) {
1✔
465
      return;
1✔
466
    }
467
    cancelCalled = true;
1✔
468
    try {
469
      // Cancel is called in exception handling cases, so it may be the case that the
470
      // stream was never successfully created or start has never been called.
471
      if (stream != null) {
1✔
472
        Status status = Status.CANCELLED;
1✔
473
        if (message != null) {
1✔
474
          status = status.withDescription(message);
1✔
475
        } else {
476
          status = status.withDescription("Call cancelled without message");
1✔
477
        }
478
        if (cause != null) {
1✔
479
          status = status.withCause(cause);
1✔
480
        }
481
        stream.cancel(status);
1✔
482
      }
483
    } finally {
484
      // start() might not have been called
485
      if (cancellationHandler != null) {
1✔
486
        cancellationHandler.tearDown();
1✔
487
      }
488
    }
489
  }
1✔
490

491
  @Override
492
  public void halfClose() {
493
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) {
1✔
494
      PerfMark.attachTag(tag);
1✔
495
      halfCloseInternal();
1✔
496
    }
497
  }
1✔
498

499
  private void halfCloseInternal() {
500
    checkState(stream != null, "Not started");
1✔
501
    checkState(!cancelCalled, "call was cancelled");
1✔
502
    checkState(!halfCloseCalled, "call already half-closed");
1✔
503
    halfCloseCalled = true;
1✔
504
    stream.halfClose();
1✔
505
  }
1✔
506

507
  @Override
508
  public void sendMessage(ReqT message) {
509
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) {
1✔
510
      PerfMark.attachTag(tag);
1✔
511
      sendMessageInternal(message);
1✔
512
    }
513
  }
1✔
514

515
  private void sendMessageInternal(ReqT message) {
516
    checkState(stream != null, "Not started");
1✔
517
    checkState(!cancelCalled, "call was cancelled");
1✔
518
    checkState(!halfCloseCalled, "call was half-closed");
1✔
519
    try {
520
      if (stream instanceof RetriableStream) {
1✔
521
        @SuppressWarnings("unchecked")
522
        RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream;
1✔
523
        retriableStream.sendMessage(message);
1✔
524
      } else {
1✔
525
        stream.writeMessage(method.streamRequest(message));
1✔
526
      }
527
    } catch (RuntimeException e) {
1✔
528
      stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
1✔
529
      return;
1✔
530
    } catch (Error e) {
×
531
      stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
×
532
      throw e;
×
533
    }
1✔
534
    // For unary requests, we don't flush since we know that halfClose should be coming soon. This
535
    // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
536
    // possibility of broken applications forgetting to call halfClose without noticing.
537
    if (!unaryRequest) {
1✔
538
      stream.flush();
1✔
539
    }
540
  }
1✔
541

542
  @Override
543
  public void setMessageCompression(boolean enabled) {
544
    checkState(stream != null, "Not started");
1✔
545
    stream.setMessageCompression(enabled);
1✔
546
  }
1✔
547

548
  @Override
549
  public boolean isReady() {
550
    if (halfCloseCalled) {
1✔
551
      return false;
1✔
552
    }
553
    return stream.isReady();
1✔
554
  }
555

556
  @Override
557
  public Attributes getAttributes() {
558
    if (stream != null) {
1✔
559
      return stream.getAttributes();
1✔
560
    }
561
    return Attributes.EMPTY;
1✔
562
  }
563

564
  private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
565
    try {
566
      observer.onClose(status, trailers);
1✔
567
    } catch (RuntimeException ex) {
1✔
568
      log.log(Level.WARNING, "Exception thrown by onClose() in ClientCall", ex);
1✔
569
    }
1✔
570
  }
1✔
571

572
  @Override
573
  public String toString() {
574
    return MoreObjects.toStringHelper(this).add("method", method).toString();
×
575
  }
576

577
  private class ClientStreamListenerImpl implements ClientStreamListener {
578
    private final Listener<RespT> observer;
579
    private Status exceptionStatus;
580

581
    public ClientStreamListenerImpl(Listener<RespT> observer) {
1✔
582
      this.observer = checkNotNull(observer, "observer");
1✔
583
    }
1✔
584

585
    /**
586
     * Cancels call and schedules onClose() notification. May only be called from the application
587
     * thread.
588
     */
589
    private void exceptionThrown(Status status) {
590
      // Since each RPC can have its own executor, we can only call onClose() when we are sure there
591
      // will be no further callbacks. We set the status here and overwrite the onClose() details
592
      // when it arrives.
593
      exceptionStatus = status;
1✔
594
      stream.cancel(status);
1✔
595
    }
1✔
596

597
    @Override
598
    public void headersRead(final Metadata headers) {
599
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) {
1✔
600
        PerfMark.attachTag(tag);
1✔
601
        final Link link = PerfMark.linkOut();
1✔
602
        final class HeadersRead extends ContextRunnable {
603
          HeadersRead() {
1✔
604
            super(context);
1✔
605
          }
1✔
606

607
          @Override
608
          public void runInContext() {
609
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) {
1✔
610
              PerfMark.attachTag(tag);
1✔
611
              PerfMark.linkIn(link);
1✔
612
              runInternal();
1✔
613
            }
614
          }
1✔
615

616
          private void runInternal() {
617
            if (exceptionStatus != null) {
1✔
618
              return;
×
619
            }
620
            try {
621
              observer.onHeaders(headers);
1✔
622
            } catch (Throwable t) {
1✔
623
              exceptionThrown(
1✔
624
                  Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
1✔
625
            }
1✔
626
          }
1✔
627
        }
628

629
        callExecutor.execute(new HeadersRead());
1✔
630
      }
631
    }
1✔
632

633
    @Override
634
    public void messagesAvailable(final MessageProducer producer) {
635
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) {
1✔
636
        PerfMark.attachTag(tag);
1✔
637
        final Link link = PerfMark.linkOut();
1✔
638
        final class MessagesAvailable extends ContextRunnable {
639
          MessagesAvailable() {
1✔
640
            super(context);
1✔
641
          }
1✔
642

643
          @Override
644
          public void runInContext() {
645
            try (TaskCloseable ignore =
1✔
646
                     PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) {
1✔
647
              PerfMark.attachTag(tag);
1✔
648
              PerfMark.linkIn(link);
1✔
649
              runInternal();
1✔
650
            }
651
          }
1✔
652

653
          private void runInternal() {
654
            if (exceptionStatus != null) {
1✔
655
              GrpcUtil.closeQuietly(producer);
×
656
              return;
×
657
            }
658
            try {
659
              InputStream message;
660
              while ((message = producer.next()) != null) {
1✔
661
                try {
662
                  observer.onMessage(method.parseResponse(message));
1✔
663
                } catch (Throwable t) {
1✔
664
                  GrpcUtil.closeQuietly(message);
1✔
665
                  throw t;
1✔
666
                }
1✔
667
                message.close();
1✔
668
              }
669
            } catch (Throwable t) {
1✔
670
              GrpcUtil.closeQuietly(producer);
1✔
671
              exceptionThrown(
1✔
672
                  Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
1✔
673
            }
1✔
674
          }
1✔
675
        }
676

677
        callExecutor.execute(new MessagesAvailable());
1✔
678
      }
679
    }
1✔
680

681
    @Override
682
    public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
683
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) {
1✔
684
        PerfMark.attachTag(tag);
1✔
685
        closedInternal(status, rpcProgress, trailers);
1✔
686
      }
687
    }
1✔
688

689
    private void closedInternal(
690
        Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
691
      Deadline deadline = effectiveDeadline();
1✔
692
      if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
1✔
693
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
694
        // description. Since our timer may be delayed in firing, we double-check the deadline and
695
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
696
        if (deadline.isExpired()) {
1✔
697
          status = cancellationHandler.formatDeadlineExceededStatus();
1✔
698
          // Replace trailers to prevent mixing sources of status and trailers.
699
          trailers = new Metadata();
1✔
700
        }
701
      }
702
      final Status savedStatus = status;
1✔
703
      final Metadata savedTrailers = trailers;
1✔
704
      final Link link = PerfMark.linkOut();
1✔
705
      final class StreamClosed extends ContextRunnable {
706
        StreamClosed() {
1✔
707
          super(context);
1✔
708
        }
1✔
709

710
        @Override
711
        public void runInContext() {
712
          try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) {
1✔
713
            PerfMark.attachTag(tag);
1✔
714
            PerfMark.linkIn(link);
1✔
715
            runInternal();
1✔
716
          }
717
        }
1✔
718

719
        private void runInternal() {
720
          cancellationHandler.tearDown();
1✔
721
          Status status = savedStatus;
1✔
722
          Metadata trailers = savedTrailers;
1✔
723
          if (exceptionStatus != null) {
1✔
724
            // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
725
            // However the cancel is racy and this closed() may have already been queued when the
726
            // cancellation occurred. Since other calls like onMessage() will throw away data if
727
            // exceptionStatus != null, it is semantically essential that we _not_ use a status
728
            // provided by the server.
729
            status = exceptionStatus;
1✔
730
            // Replace trailers to prevent mixing sources of status and trailers.
731
            trailers = new Metadata();
1✔
732
          }
733
          try {
734
            closeObserver(observer, status, trailers);
1✔
735
          } finally {
736
            channelCallsTracer.reportCallEnded(status.isOk());
1✔
737
          }
738
        }
1✔
739
      }
740

741
      callExecutor.execute(new StreamClosed());
1✔
742
    }
1✔
743

744
    @Override
745
    public void onReady() {
746
      if (method.getType().clientSendsOneMessage()) {
1✔
747
        return;
1✔
748
      }
749
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) {
1✔
750
        PerfMark.attachTag(tag);
1✔
751
        final Link link = PerfMark.linkOut();
1✔
752

753
        final class StreamOnReady extends ContextRunnable {
754
          StreamOnReady() {
1✔
755
            super(context);
1✔
756
          }
1✔
757

758
          @Override
759
          public void runInContext() {
760
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) {
1✔
761
              PerfMark.attachTag(tag);
1✔
762
              PerfMark.linkIn(link);
1✔
763
              runInternal();
1✔
764
            }
765
          }
1✔
766

767
          private void runInternal() {
768
            if (exceptionStatus != null) {
1✔
769
              return;
×
770
            }
771
            try {
772
              observer.onReady();
1✔
773
            } catch (Throwable t) {
1✔
774
              exceptionThrown(
1✔
775
                  Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
1✔
776
            }
1✔
777
          }
1✔
778
        }
779

780
        callExecutor.execute(new StreamOnReady());
1✔
781
      }
782
    }
1✔
783
  }
784
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc