• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19862

13 Jun 2025 03:18PM UTC coverage: 88.578% (+0.006%) from 88.572%
#19862

push

github

ejona86
xds: Avoid changing cache when watching children in XdsDepManager

The watchers can be completely regular, so the base class can do the
cache management while the subclasses are only concerned with
subscribing to children.

34564 of 39021 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.77
/../core/src/main/java/io/grpc/internal/ClientCallImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
24
import static io.grpc.Contexts.statusFromCancelled;
25
import static io.grpc.Status.DEADLINE_EXCEEDED;
26
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
27
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
28
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
29
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
30
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
31

32
import com.google.common.annotations.VisibleForTesting;
33
import com.google.common.base.MoreObjects;
34
import io.grpc.Attributes;
35
import io.grpc.CallOptions;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.Codec;
39
import io.grpc.Compressor;
40
import io.grpc.CompressorRegistry;
41
import io.grpc.Context;
42
import io.grpc.Context.CancellationListener;
43
import io.grpc.Deadline;
44
import io.grpc.DecompressorRegistry;
45
import io.grpc.InternalConfigSelector;
46
import io.grpc.InternalDecompressorRegistry;
47
import io.grpc.Metadata;
48
import io.grpc.MethodDescriptor;
49
import io.grpc.MethodDescriptor.MethodType;
50
import io.grpc.Status;
51
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
52
import io.perfmark.Link;
53
import io.perfmark.PerfMark;
54
import io.perfmark.Tag;
55
import io.perfmark.TaskCloseable;
56
import java.io.InputStream;
57
import java.nio.charset.Charset;
58
import java.util.Locale;
59
import java.util.concurrent.CancellationException;
60
import java.util.concurrent.Executor;
61
import java.util.concurrent.ScheduledExecutorService;
62
import java.util.concurrent.ScheduledFuture;
63
import java.util.concurrent.TimeUnit;
64
import java.util.concurrent.TimeoutException;
65
import java.util.logging.Level;
66
import java.util.logging.Logger;
67
import javax.annotation.Nullable;
68

69
/**
70
 * Implementation of {@link ClientCall}.
71
 */
72
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
73

74
  private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
1✔
75
  private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
1✔
76
      = "gzip".getBytes(Charset.forName("US-ASCII"));
1✔
77
  private static final double NANO_TO_SECS = 1.0 * TimeUnit.SECONDS.toNanos(1);
1✔
78

79
  private final MethodDescriptor<ReqT, RespT> method;
80
  private final Tag tag;
81
  private final Executor callExecutor;
82
  private final boolean callExecutorIsDirect;
83
  private final CallTracer channelCallsTracer;
84
  private final Context context;
85
  private CancellationHandler cancellationHandler;
86
  private final boolean unaryRequest;
87
  private CallOptions callOptions;
88
  private ClientStream stream;
89
  private boolean cancelCalled;
90
  private boolean halfCloseCalled;
91
  private final ClientStreamProvider clientStreamProvider;
92
  private final ScheduledExecutorService deadlineCancellationExecutor;
93
  private boolean fullStreamDecompression;
94
  private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
95
  private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
1✔
96

97
  ClientCallImpl(
98
      MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
99
      ClientStreamProvider clientStreamProvider,
100
      ScheduledExecutorService deadlineCancellationExecutor,
101
      CallTracer channelCallsTracer,
102
      // TODO(zdapeng): remove this arg
103
      @Nullable InternalConfigSelector configSelector) {
1✔
104
    this.method = method;
1✔
105
    // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
106
    this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
1✔
107
    // If we know that the executor is a direct executor, we don't need to wrap it with a
108
    // SerializingExecutor. This is purely for performance reasons.
109
    // See https://github.com/grpc/grpc-java/issues/368
110
    if (executor == directExecutor()) {
1✔
111
      this.callExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
112
      callExecutorIsDirect = true;
1✔
113
    } else {
114
      this.callExecutor = new SerializingExecutor(executor);
1✔
115
      callExecutorIsDirect = false;
1✔
116
    }
117
    this.channelCallsTracer = channelCallsTracer;
1✔
118
    // Propagate the context from the thread which initiated the call to all callbacks.
119
    this.context = Context.current();
1✔
120
    this.unaryRequest = method.getType() == MethodType.UNARY
1✔
121
        || method.getType() == MethodType.SERVER_STREAMING;
1✔
122
    this.callOptions = callOptions;
1✔
123
    this.clientStreamProvider = clientStreamProvider;
1✔
124
    this.deadlineCancellationExecutor = deadlineCancellationExecutor;
1✔
125
    PerfMark.event("ClientCall.<init>", tag);
1✔
126
  }
1✔
127

128
  /**
129
   * Provider of {@link ClientStream}s.
130
   */
131
  interface ClientStreamProvider {
132
    ClientStream newStream(
133
        MethodDescriptor<?, ?> method,
134
        CallOptions callOptions,
135
        Metadata headers,
136
        Context context);
137
  }
138

139
  ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
140
    this.fullStreamDecompression = fullStreamDecompression;
1✔
141
    return this;
1✔
142
  }
143

144
  ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
145
    this.decompressorRegistry = decompressorRegistry;
1✔
146
    return this;
1✔
147
  }
148

149
  ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
150
    this.compressorRegistry = compressorRegistry;
1✔
151
    return this;
1✔
152
  }
153

154
  @VisibleForTesting
155
  static void prepareHeaders(
156
      Metadata headers,
157
      DecompressorRegistry decompressorRegistry,
158
      Compressor compressor,
159
      boolean fullStreamDecompression) {
160
    headers.discardAll(CONTENT_LENGTH_KEY);
1✔
161
    headers.discardAll(MESSAGE_ENCODING_KEY);
1✔
162
    if (compressor != Codec.Identity.NONE) {
1✔
163
      headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
1✔
164
    }
165

166
    headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
167
    byte[] advertisedEncodings =
1✔
168
        InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
1✔
169
    if (advertisedEncodings.length != 0) {
1✔
170
      headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
1✔
171
    }
172

173
    headers.discardAll(CONTENT_ENCODING_KEY);
1✔
174
    headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
1✔
175
    if (fullStreamDecompression) {
1✔
176
      headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
1✔
177
    }
178
  }
1✔
179

180
  @Override
181
  public void start(Listener<RespT> observer, Metadata headers) {
182
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) {
1✔
183
      PerfMark.attachTag(tag);
1✔
184
      startInternal(observer, headers);
1✔
185
    }
186
  }
1✔
187

188
  private void startInternal(Listener<RespT> observer, Metadata headers) {
189
    checkState(stream == null, "Already started");
1✔
190
    checkState(!cancelCalled, "call was cancelled");
1✔
191
    checkNotNull(observer, "observer");
1✔
192
    checkNotNull(headers, "headers");
1✔
193

194
    if (context.isCancelled()) {
1✔
195
      // Context is already cancelled so no need to create a real stream, just notify the observer
196
      // of cancellation via callback on the executor
197
      stream = NoopClientStream.INSTANCE;
1✔
198
      final Listener<RespT> finalObserver = observer;
1✔
199
      class ClosedByContext extends ContextRunnable {
200
        ClosedByContext() {
1✔
201
          super(context);
1✔
202
        }
1✔
203

204
        @Override
205
        public void runInContext() {
206
          closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
1✔
207
        }
1✔
208
      }
209

210
      callExecutor.execute(new ClosedByContext());
1✔
211
      return;
1✔
212
    }
213
    applyMethodConfig();
1✔
214
    final String compressorName = callOptions.getCompressor();
1✔
215
    Compressor compressor;
216
    if (compressorName != null) {
1✔
217
      compressor = compressorRegistry.lookupCompressor(compressorName);
1✔
218
      if (compressor == null) {
1✔
219
        stream = NoopClientStream.INSTANCE;
×
220
        final Listener<RespT> finalObserver = observer;
×
221
        class ClosedByNotFoundCompressor extends ContextRunnable {
222
          ClosedByNotFoundCompressor() {
×
223
            super(context);
×
224
          }
×
225

226
          @Override
227
          public void runInContext() {
228
            closeObserver(
×
229
                finalObserver,
230
                Status.INTERNAL.withDescription(
×
231
                    String.format("Unable to find compressor by name %s", compressorName)),
×
232
                new Metadata());
233
          }
×
234
        }
235

236
        callExecutor.execute(new ClosedByNotFoundCompressor());
×
237
        return;
×
238
      }
239
    } else {
240
      compressor = Codec.Identity.NONE;
1✔
241
    }
242
    prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
1✔
243

244
    Deadline effectiveDeadline = effectiveDeadline();
1✔
245
    boolean contextIsDeadlineSource = effectiveDeadline != null
1✔
246
        && effectiveDeadline.equals(context.getDeadline());
1✔
247
    cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
1✔
248
    boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
1✔
249
    if (!deadlineExceeded) {
1✔
250
      stream = clientStreamProvider.newStream(method, callOptions, headers, context);
1✔
251
    } else {
252
      ClientStreamTracer[] tracers =
1✔
253
          GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
1✔
254
      String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
1✔
255
      Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
256
      String description = String.format(
1✔
257
          "ClientCall started after %s deadline was exceeded %.9f seconds ago. "
258
              + "Name resolution delay %.9f seconds.", deadlineName,
259
          cancellationHandler.remainingNanos / NANO_TO_SECS,
1✔
260
          nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
1✔
261
      stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
1✔
262
    }
263

264
    if (callExecutorIsDirect) {
1✔
265
      stream.optimizeForDirectExecutor();
1✔
266
    }
267
    if (callOptions.getAuthority() != null) {
1✔
268
      stream.setAuthority(callOptions.getAuthority());
1✔
269
    }
270
    if (callOptions.getMaxInboundMessageSize() != null) {
1✔
271
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
1✔
272
    }
273
    if (callOptions.getMaxOutboundMessageSize() != null) {
1✔
274
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
1✔
275
    }
276
    if (effectiveDeadline != null) {
1✔
277
      stream.setDeadline(effectiveDeadline);
1✔
278
    }
279
    stream.setCompressor(compressor);
1✔
280
    if (fullStreamDecompression) {
1✔
281
      stream.setFullStreamDecompression(fullStreamDecompression);
×
282
    }
283
    stream.setDecompressorRegistry(decompressorRegistry);
1✔
284
    channelCallsTracer.reportCallStarted();
1✔
285
    stream.start(new ClientStreamListenerImpl(observer));
1✔
286

287
    // Delay any sources of cancellation after start(), because most of the transports are broken if
288
    // they receive cancel before start. Issue #1343 has more details
289

290
    // Propagate later Context cancellation to the remote side.
291
    cancellationHandler.setUp();
1✔
292
  }
1✔
293

294
  private void applyMethodConfig() {
295
    MethodInfo info = callOptions.getOption(MethodInfo.KEY);
1✔
296
    if (info == null) {
1✔
297
      return;
1✔
298
    }
299
    if (info.timeoutNanos != null) {
1✔
300
      Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
301
      Deadline existingDeadline = callOptions.getDeadline();
1✔
302
      // If the new deadline is sooner than the existing deadline, swap them.
303
      if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) {
1✔
304
        callOptions = callOptions.withDeadline(newDeadline);
1✔
305
      }
306
    }
307
    if (info.waitForReady != null) {
1✔
308
      callOptions =
1✔
309
          info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady();
1✔
310
    }
311
    if (info.maxInboundMessageSize != null) {
1✔
312
      Integer existingLimit = callOptions.getMaxInboundMessageSize();
×
313
      if (existingLimit != null) {
×
314
        callOptions =
×
315
            callOptions.withMaxInboundMessageSize(
×
316
                Math.min(existingLimit, info.maxInboundMessageSize));
×
317
      } else {
318
        callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize);
×
319
      }
320
    }
321
    if (info.maxOutboundMessageSize != null) {
1✔
322
      Integer existingLimit = callOptions.getMaxOutboundMessageSize();
×
323
      if (existingLimit != null) {
×
324
        callOptions =
×
325
            callOptions.withMaxOutboundMessageSize(
×
326
                Math.min(existingLimit, info.maxOutboundMessageSize));
×
327
      } else {
328
        callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
×
329
      }
330
    }
331
  }
1✔
332

333
  private final class CancellationHandler implements Runnable, CancellationListener {
334
    private final boolean contextIsDeadlineSource;
335
    private final boolean hasDeadline;
336
    private final long remainingNanos;
337
    private volatile ScheduledFuture<?> deadlineCancellationFuture;
338
    private volatile boolean tearDownCalled;
339

340
    CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
1✔
341
      this.contextIsDeadlineSource = contextIsDeadlineSource;
1✔
342
      if (deadline == null) {
1✔
343
        hasDeadline = false;
1✔
344
        remainingNanos = 0;
1✔
345
      } else {
346
        hasDeadline = true;
1✔
347
        remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
348
      }
349
    }
1✔
350

351
    void setUp() {
352
      if (tearDownCalled) {
1✔
353
        return;
1✔
354
      }
355
      if (hasDeadline
1✔
356
          // If the context has the effective deadline, we don't need to schedule an extra task.
357
          && !contextIsDeadlineSource
358
          // If the channel has been terminated, we don't need to schedule an extra task.
359
          && deadlineCancellationExecutor != null) {
1✔
360
        deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
1✔
361
            new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
362
      }
363
      context.addListener(this, directExecutor());
1✔
364
      if (tearDownCalled) {
1✔
365
        // Race detected! Re-run to make sure the future is cancelled and context listener removed
366
        tearDown();
×
367
      }
368
    }
1✔
369

370
    // May be called multiple times, and race with setUp()
371
    void tearDown() {
372
      tearDownCalled = true;
1✔
373
      ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
1✔
374
      if (deadlineCancellationFuture != null) {
1✔
375
        deadlineCancellationFuture.cancel(false);
1✔
376
      }
377
      context.removeListener(this);
1✔
378
    }
1✔
379

380
    @Override
381
    public void cancelled(Context context) {
382
      if (hasDeadline && contextIsDeadlineSource
1✔
383
          && context.cancellationCause() instanceof TimeoutException) {
1✔
384
        stream.cancel(formatDeadlineExceededStatus());
1✔
385
        return;
1✔
386
      }
387
      stream.cancel(statusFromCancelled(context));
1✔
388
    }
1✔
389

390
    @Override
391
    public void run() {
392
      stream.cancel(formatDeadlineExceededStatus());
1✔
393
    }
1✔
394

395
    Status formatDeadlineExceededStatus() {
396
      // DelayedStream.cancel() is safe to call from a thread that is different from where the
397
      // stream is created.
398
      long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
399
      long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
400

401
      StringBuilder buf = new StringBuilder();
1✔
402
      buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
1✔
403
      buf.append(" deadline exceeded after ");
1✔
404
      if (remainingNanos < 0) {
1✔
405
        buf.append('-');
1✔
406
      }
407
      buf.append(seconds);
1✔
408
      buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
409
      buf.append("s. ");
1✔
410
      Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
411
      buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
1✔
412
          nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
1✔
413
      if (stream != null) {
1✔
414
        InsightBuilder insight = new InsightBuilder();
1✔
415
        stream.appendTimeoutInsight(insight);
1✔
416
        buf.append(" ");
1✔
417
        buf.append(insight);
1✔
418
      }
419
      return DEADLINE_EXCEEDED.withDescription(buf.toString());
1✔
420
    }
421
  }
422

423
  @Nullable
424
  private Deadline effectiveDeadline() {
425
    // Call options and context are immutable, so we don't need to cache the deadline.
426
    return min(callOptions.getDeadline(), context.getDeadline());
1✔
427
  }
428

429
  @Nullable
430
  private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
431
    if (deadline0 == null) {
1✔
432
      return deadline1;
1✔
433
    }
434
    if (deadline1 == null) {
1✔
435
      return deadline0;
1✔
436
    }
437
    return deadline0.minimum(deadline1);
1✔
438
  }
439

440
  @Override
441
  public void request(int numMessages) {
442
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
1✔
443
      PerfMark.attachTag(tag);
1✔
444
      checkState(stream != null, "Not started");
1✔
445
      checkArgument(numMessages >= 0, "Number requested must be non-negative");
1✔
446
      stream.request(numMessages);
1✔
447
    }
448
  }
1✔
449

450
  @Override
451
  public void cancel(@Nullable String message, @Nullable Throwable cause) {
452
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) {
1✔
453
      PerfMark.attachTag(tag);
1✔
454
      cancelInternal(message, cause);
1✔
455
    }
456
  }
1✔
457

458
  private void cancelInternal(@Nullable String message, @Nullable Throwable cause) {
459
    if (message == null && cause == null) {
1✔
460
      cause = new CancellationException("Cancelled without a message or cause");
×
461
      log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
×
462
    }
463
    if (cancelCalled) {
1✔
464
      return;
1✔
465
    }
466
    cancelCalled = true;
1✔
467
    try {
468
      // Cancel is called in exception handling cases, so it may be the case that the
469
      // stream was never successfully created or start has never been called.
470
      if (stream != null) {
1✔
471
        Status status = Status.CANCELLED;
1✔
472
        if (message != null) {
1✔
473
          status = status.withDescription(message);
1✔
474
        } else {
475
          status = status.withDescription("Call cancelled without message");
1✔
476
        }
477
        if (cause != null) {
1✔
478
          status = status.withCause(cause);
1✔
479
        }
480
        stream.cancel(status);
1✔
481
      }
482
    } finally {
483
      // start() might not have been called
484
      if (cancellationHandler != null) {
1✔
485
        cancellationHandler.tearDown();
1✔
486
      }
487
    }
488
  }
1✔
489

490
  @Override
491
  public void halfClose() {
492
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) {
1✔
493
      PerfMark.attachTag(tag);
1✔
494
      halfCloseInternal();
1✔
495
    }
496
  }
1✔
497

498
  private void halfCloseInternal() {
499
    checkState(stream != null, "Not started");
1✔
500
    checkState(!cancelCalled, "call was cancelled");
1✔
501
    checkState(!halfCloseCalled, "call already half-closed");
1✔
502
    halfCloseCalled = true;
1✔
503
    stream.halfClose();
1✔
504
  }
1✔
505

506
  @Override
507
  public void sendMessage(ReqT message) {
508
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) {
1✔
509
      PerfMark.attachTag(tag);
1✔
510
      sendMessageInternal(message);
1✔
511
    }
512
  }
1✔
513

514
  private void sendMessageInternal(ReqT message) {
515
    checkState(stream != null, "Not started");
1✔
516
    checkState(!cancelCalled, "call was cancelled");
1✔
517
    checkState(!halfCloseCalled, "call was half-closed");
1✔
518
    try {
519
      if (stream instanceof RetriableStream) {
1✔
520
        @SuppressWarnings("unchecked")
521
        RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream;
1✔
522
        retriableStream.sendMessage(message);
1✔
523
      } else {
1✔
524
        stream.writeMessage(method.streamRequest(message));
1✔
525
      }
526
    } catch (RuntimeException e) {
1✔
527
      stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
1✔
528
      return;
1✔
529
    } catch (Error e) {
×
530
      stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
×
531
      throw e;
×
532
    }
1✔
533
    // For unary requests, we don't flush since we know that halfClose should be coming soon. This
534
    // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
535
    // possibility of broken applications forgetting to call halfClose without noticing.
536
    if (!unaryRequest) {
1✔
537
      stream.flush();
1✔
538
    }
539
  }
1✔
540

541
  @Override
542
  public void setMessageCompression(boolean enabled) {
543
    checkState(stream != null, "Not started");
1✔
544
    stream.setMessageCompression(enabled);
1✔
545
  }
1✔
546

547
  @Override
548
  public boolean isReady() {
549
    if (halfCloseCalled) {
1✔
550
      return false;
1✔
551
    }
552
    return stream.isReady();
1✔
553
  }
554

555
  @Override
556
  public Attributes getAttributes() {
557
    if (stream != null) {
1✔
558
      return stream.getAttributes();
1✔
559
    }
560
    return Attributes.EMPTY;
1✔
561
  }
562

563
  private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
564
    try {
565
      observer.onClose(status, trailers);
1✔
566
    } catch (RuntimeException ex) {
1✔
567
      log.log(Level.WARNING, "Exception thrown by onClose() in ClientCall", ex);
1✔
568
    }
1✔
569
  }
1✔
570

571
  @Override
572
  public String toString() {
573
    return MoreObjects.toStringHelper(this).add("method", method).toString();
×
574
  }
575

576
  private class ClientStreamListenerImpl implements ClientStreamListener {
577
    private final Listener<RespT> observer;
578
    private Status exceptionStatus;
579

580
    public ClientStreamListenerImpl(Listener<RespT> observer) {
1✔
581
      this.observer = checkNotNull(observer, "observer");
1✔
582
    }
1✔
583

584
    /**
585
     * Cancels call and schedules onClose() notification. May only be called from the application
586
     * thread.
587
     */
588
    private void exceptionThrown(Status status) {
589
      // Since each RPC can have its own executor, we can only call onClose() when we are sure there
590
      // will be no further callbacks. We set the status here and overwrite the onClose() details
591
      // when it arrives.
592
      exceptionStatus = status;
1✔
593
      stream.cancel(status);
1✔
594
    }
1✔
595

596
    @Override
597
    public void headersRead(final Metadata headers) {
598
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) {
1✔
599
        PerfMark.attachTag(tag);
1✔
600
        final Link link = PerfMark.linkOut();
1✔
601
        final class HeadersRead extends ContextRunnable {
602
          HeadersRead() {
1✔
603
            super(context);
1✔
604
          }
1✔
605

606
          @Override
607
          public void runInContext() {
608
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) {
1✔
609
              PerfMark.attachTag(tag);
1✔
610
              PerfMark.linkIn(link);
1✔
611
              runInternal();
1✔
612
            }
613
          }
1✔
614

615
          private void runInternal() {
616
            if (exceptionStatus != null) {
1✔
617
              return;
×
618
            }
619
            try {
620
              observer.onHeaders(headers);
1✔
621
            } catch (Throwable t) {
1✔
622
              exceptionThrown(
1✔
623
                  Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
1✔
624
            }
1✔
625
          }
1✔
626
        }
627

628
        callExecutor.execute(new HeadersRead());
1✔
629
      }
630
    }
1✔
631

632
    @Override
633
    public void messagesAvailable(final MessageProducer producer) {
634
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) {
1✔
635
        PerfMark.attachTag(tag);
1✔
636
        final Link link = PerfMark.linkOut();
1✔
637
        final class MessagesAvailable extends ContextRunnable {
638
          MessagesAvailable() {
1✔
639
            super(context);
1✔
640
          }
1✔
641

642
          @Override
643
          public void runInContext() {
644
            try (TaskCloseable ignore =
1✔
645
                     PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) {
1✔
646
              PerfMark.attachTag(tag);
1✔
647
              PerfMark.linkIn(link);
1✔
648
              runInternal();
1✔
649
            }
650
          }
1✔
651

652
          private void runInternal() {
653
            if (exceptionStatus != null) {
1✔
654
              GrpcUtil.closeQuietly(producer);
×
655
              return;
×
656
            }
657
            try {
658
              InputStream message;
659
              while ((message = producer.next()) != null) {
1✔
660
                try {
661
                  observer.onMessage(method.parseResponse(message));
1✔
662
                } catch (Throwable t) {
1✔
663
                  GrpcUtil.closeQuietly(message);
1✔
664
                  throw t;
1✔
665
                }
1✔
666
                message.close();
1✔
667
              }
668
            } catch (Throwable t) {
1✔
669
              GrpcUtil.closeQuietly(producer);
1✔
670
              exceptionThrown(
1✔
671
                  Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
1✔
672
            }
1✔
673
          }
1✔
674
        }
675

676
        callExecutor.execute(new MessagesAvailable());
1✔
677
      }
678
    }
1✔
679

680
    @Override
681
    public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
682
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) {
1✔
683
        PerfMark.attachTag(tag);
1✔
684
        closedInternal(status, rpcProgress, trailers);
1✔
685
      }
686
    }
1✔
687

688
    private void closedInternal(
689
        Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
690
      Deadline deadline = effectiveDeadline();
1✔
691
      if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
1✔
692
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
693
        // description. Since our timer may be delayed in firing, we double-check the deadline and
694
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
695
        if (deadline.isExpired()) {
1✔
696
          status = cancellationHandler.formatDeadlineExceededStatus();
1✔
697
          // Replace trailers to prevent mixing sources of status and trailers.
698
          trailers = new Metadata();
1✔
699
        }
700
      }
701
      final Status savedStatus = status;
1✔
702
      final Metadata savedTrailers = trailers;
1✔
703
      final Link link = PerfMark.linkOut();
1✔
704
      final class StreamClosed extends ContextRunnable {
705
        StreamClosed() {
1✔
706
          super(context);
1✔
707
        }
1✔
708

709
        @Override
710
        public void runInContext() {
711
          try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) {
1✔
712
            PerfMark.attachTag(tag);
1✔
713
            PerfMark.linkIn(link);
1✔
714
            runInternal();
1✔
715
          }
716
        }
1✔
717

718
        private void runInternal() {
719
          cancellationHandler.tearDown();
1✔
720
          Status status = savedStatus;
1✔
721
          Metadata trailers = savedTrailers;
1✔
722
          if (exceptionStatus != null) {
1✔
723
            // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
724
            // However the cancel is racy and this closed() may have already been queued when the
725
            // cancellation occurred. Since other calls like onMessage() will throw away data if
726
            // exceptionStatus != null, it is semantically essential that we _not_ use a status
727
            // provided by the server.
728
            status = exceptionStatus;
1✔
729
            // Replace trailers to prevent mixing sources of status and trailers.
730
            trailers = new Metadata();
1✔
731
          }
732
          try {
733
            closeObserver(observer, status, trailers);
1✔
734
          } finally {
735
            channelCallsTracer.reportCallEnded(status.isOk());
1✔
736
          }
737
        }
1✔
738
      }
739

740
      callExecutor.execute(new StreamClosed());
1✔
741
    }
1✔
742

743
    @Override
744
    public void onReady() {
745
      if (method.getType().clientSendsOneMessage()) {
1✔
746
        return;
1✔
747
      }
748
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) {
1✔
749
        PerfMark.attachTag(tag);
1✔
750
        final Link link = PerfMark.linkOut();
1✔
751

752
        final class StreamOnReady extends ContextRunnable {
753
          StreamOnReady() {
1✔
754
            super(context);
1✔
755
          }
1✔
756

757
          @Override
758
          public void runInContext() {
759
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) {
1✔
760
              PerfMark.attachTag(tag);
1✔
761
              PerfMark.linkIn(link);
1✔
762
              runInternal();
1✔
763
            }
764
          }
1✔
765

766
          private void runInternal() {
767
            if (exceptionStatus != null) {
1✔
768
              return;
×
769
            }
770
            try {
771
              observer.onReady();
1✔
772
            } catch (Throwable t) {
1✔
773
              exceptionThrown(
1✔
774
                  Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
1✔
775
            }
1✔
776
          }
1✔
777
        }
778

779
        callExecutor.execute(new StreamOnReady());
1✔
780
      }
781
    }
1✔
782
  }
783
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc