• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20232

02 Apr 2026 11:39AM UTC coverage: 88.784% (+0.03%) from 88.755%
#20232

push

github

web-flow
xds: Add configuration objects for ExtAuthz, GrpcService and Bootstrap changes for GrpcService (#12492)

This commit introduces configuration objects for the external
authorization (ExtAuthz) filter and the gRPC service and corresponding
translations from XDS proto and Bootstrap. These classes provide a
structured, immutable representation of the subset of the configuration
defined in the xDS protobuf messages.

This PR should mostly now (hopefully ) be compliant with
https://github.com/grpc/proposal/pull/510 but without
- CallCredentials (since I don't see A97) being implemented yet and
would prefer to do it in a followup , we return empty optional)
- TlsCredentials( since it's non trivial to construct a TLS credentials
object, we throw an exception)
- LocalCredentials(Java does't support these, we throw an exception)

The main new classes are:
- `ExtAuthzConfig`: Represents the configuration for the `ExtAuthz`
filter, including settings for the gRPC service, header mutation rules,
and other filter behaviors.
- `GrpcServiceConfig`: Represents the configuration for a gRPC service,
including the target URI, credentials, and other settings.
- `HeaderMutationRulesConfig`: Represents the configuration for header
mutation rules.
- `ChannelCredsConfig` and friends: To allow comparison between
credential configuration , to allow caching based on creds which'll be
needed in followup PRs for authz and proc.

The relevant sections of the spec are 
- GrpcService: https://github.com/grpc/proposal/pull/510
- ExtAuthz:
https://github.com/grpc/proposal/pull/481/files#diff-6bb76a24ad2fd8849f164244e68cd54eaR106-R190

This commit also includes parsers to create these configuration objects
from the corresponding protobuf messages, as well as unit tests for the
new classes.

35968 of 40512 relevant lines covered (88.78%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.77
/../core/src/main/java/io/grpc/internal/ClientCallImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
24
import static io.grpc.Contexts.statusFromCancelled;
25
import static io.grpc.Status.DEADLINE_EXCEEDED;
26
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
27
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
28
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
29
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
30
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
31

32
import com.google.common.annotations.VisibleForTesting;
33
import com.google.common.base.MoreObjects;
34
import io.grpc.Attributes;
35
import io.grpc.CallOptions;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.Codec;
39
import io.grpc.Compressor;
40
import io.grpc.CompressorRegistry;
41
import io.grpc.Context;
42
import io.grpc.Context.CancellationListener;
43
import io.grpc.Deadline;
44
import io.grpc.DecompressorRegistry;
45
import io.grpc.InternalConfigSelector;
46
import io.grpc.InternalDecompressorRegistry;
47
import io.grpc.Metadata;
48
import io.grpc.MethodDescriptor;
49
import io.grpc.MethodDescriptor.MethodType;
50
import io.grpc.Status;
51
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
52
import io.perfmark.Link;
53
import io.perfmark.PerfMark;
54
import io.perfmark.Tag;
55
import io.perfmark.TaskCloseable;
56
import java.io.InputStream;
57
import java.nio.charset.Charset;
58
import java.util.Locale;
59
import java.util.concurrent.CancellationException;
60
import java.util.concurrent.Executor;
61
import java.util.concurrent.ScheduledExecutorService;
62
import java.util.concurrent.ScheduledFuture;
63
import java.util.concurrent.TimeUnit;
64
import java.util.concurrent.TimeoutException;
65
import java.util.logging.Level;
66
import java.util.logging.Logger;
67
import javax.annotation.Nullable;
68

69
/**
70
 * Implementation of {@link ClientCall}.
71
 */
72
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
73

74
  private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
1✔
75
  private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
1✔
76
      = "gzip".getBytes(Charset.forName("US-ASCII"));
1✔
77
  private static final double NANO_TO_SECS = 1.0 * TimeUnit.SECONDS.toNanos(1);
1✔
78

79
  private final MethodDescriptor<ReqT, RespT> method;
80
  private final Tag tag;
81
  private final Executor callExecutor;
82
  private final boolean callExecutorIsDirect;
83
  private final CallTracer channelCallsTracer;
84
  private final Context context;
85
  private CancellationHandler cancellationHandler;
86
  private final boolean unaryRequest;
87
  private CallOptions callOptions;
88
  private ClientStream stream;
89
  private boolean cancelCalled;
90
  private boolean halfCloseCalled;
91
  private final ClientStreamProvider clientStreamProvider;
92
  private final ScheduledExecutorService deadlineCancellationExecutor;
93
  private boolean fullStreamDecompression;
94
  private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
95
  private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
1✔
96

97
  ClientCallImpl(
98
      MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
99
      ClientStreamProvider clientStreamProvider,
100
      ScheduledExecutorService deadlineCancellationExecutor,
101
      CallTracer channelCallsTracer,
102
      // TODO(zdapeng): remove this arg
103
      @Nullable InternalConfigSelector configSelector) {
1✔
104
    this.method = method;
1✔
105
    // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
106
    this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
1✔
107
    // If we know that the executor is a direct executor, we don't need to wrap it with a
108
    // SerializingExecutor. This is purely for performance reasons.
109
    // See https://github.com/grpc/grpc-java/issues/368
110
    if (executor == directExecutor()) {
1✔
111
      this.callExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
112
      callExecutorIsDirect = true;
1✔
113
    } else {
114
      this.callExecutor = new SerializingExecutor(executor);
1✔
115
      callExecutorIsDirect = false;
1✔
116
    }
117
    this.channelCallsTracer = channelCallsTracer;
1✔
118
    // Propagate the context from the thread which initiated the call to all callbacks.
119
    this.context = Context.current();
1✔
120
    this.unaryRequest = method.getType() == MethodType.UNARY
1✔
121
        || method.getType() == MethodType.SERVER_STREAMING;
1✔
122
    this.callOptions = callOptions;
1✔
123
    this.clientStreamProvider = clientStreamProvider;
1✔
124
    this.deadlineCancellationExecutor = deadlineCancellationExecutor;
1✔
125
    PerfMark.event("ClientCall.<init>", tag);
1✔
126
  }
1✔
127

128
  /**
129
   * Provider of {@link ClientStream}s.
130
   */
131
  interface ClientStreamProvider {
132
    ClientStream newStream(
133
        MethodDescriptor<?, ?> method,
134
        CallOptions callOptions,
135
        Metadata headers,
136
        Context context);
137
  }
138

139
  ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
140
    this.fullStreamDecompression = fullStreamDecompression;
1✔
141
    return this;
1✔
142
  }
143

144
  ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
145
    this.decompressorRegistry = decompressorRegistry;
1✔
146
    return this;
1✔
147
  }
148

149
  ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
150
    this.compressorRegistry = compressorRegistry;
1✔
151
    return this;
1✔
152
  }
153

154
  @VisibleForTesting
155
  static void prepareHeaders(
156
      Metadata headers,
157
      DecompressorRegistry decompressorRegistry,
158
      Compressor compressor,
159
      boolean fullStreamDecompression) {
160
    headers.discardAll(CONTENT_LENGTH_KEY);
1✔
161
    headers.discardAll(MESSAGE_ENCODING_KEY);
1✔
162
    if (compressor != Codec.Identity.NONE) {
1✔
163
      headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
1✔
164
    }
165

166
    headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
167
    byte[] advertisedEncodings =
1✔
168
        InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
1✔
169
    if (advertisedEncodings.length != 0) {
1✔
170
      headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
1✔
171
    }
172

173
    headers.discardAll(CONTENT_ENCODING_KEY);
1✔
174
    headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
1✔
175
    if (fullStreamDecompression) {
1✔
176
      headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
1✔
177
    }
178
  }
1✔
179

180
  @Override
181
  public void start(Listener<RespT> observer, Metadata headers) {
182
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) {
1✔
183
      PerfMark.attachTag(tag);
1✔
184
      startInternal(observer, headers);
1✔
185
    }
186
  }
1✔
187

188
  private void startInternal(Listener<RespT> observer, Metadata headers) {
189
    checkState(stream == null, "Already started");
1✔
190
    checkState(!cancelCalled, "call was cancelled");
1✔
191
    checkNotNull(observer, "observer");
1✔
192
    checkNotNull(headers, "headers");
1✔
193

194
    if (context.isCancelled()) {
1✔
195
      // Context is already cancelled so no need to create a real stream, just notify the observer
196
      // of cancellation via callback on the executor
197
      stream = NoopClientStream.INSTANCE;
1✔
198
      final Listener<RespT> finalObserver = observer;
1✔
199
      class ClosedByContext extends ContextRunnable {
200
        ClosedByContext() {
1✔
201
          super(context);
1✔
202
        }
1✔
203

204
        @Override
205
        public void runInContext() {
206
          closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
1✔
207
        }
1✔
208
      }
209

210
      callExecutor.execute(new ClosedByContext());
1✔
211
      return;
1✔
212
    }
213
    applyMethodConfig();
1✔
214
    final String compressorName = callOptions.getCompressor();
1✔
215
    Compressor compressor;
216
    if (compressorName != null) {
1✔
217
      compressor = compressorRegistry.lookupCompressor(compressorName);
1✔
218
      if (compressor == null) {
1✔
219
        stream = NoopClientStream.INSTANCE;
×
220
        final Listener<RespT> finalObserver = observer;
×
221
        class ClosedByNotFoundCompressor extends ContextRunnable {
222
          ClosedByNotFoundCompressor() {
×
223
            super(context);
×
224
          }
×
225

226
          @Override
227
          public void runInContext() {
228
            closeObserver(
×
229
                finalObserver,
230
                Status.INTERNAL.withDescription(
×
231
                    String.format("Unable to find compressor by name %s", compressorName)),
×
232
                new Metadata());
233
          }
×
234
        }
235

236
        callExecutor.execute(new ClosedByNotFoundCompressor());
×
237
        return;
×
238
      }
239
    } else {
240
      compressor = Codec.Identity.NONE;
1✔
241
    }
242
    prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
1✔
243

244
    Deadline effectiveDeadline = effectiveDeadline();
1✔
245
    boolean contextIsDeadlineSource = effectiveDeadline != null
1✔
246
        && effectiveDeadline.equals(context.getDeadline());
1✔
247
    cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
1✔
248
    boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
1✔
249
    if (!deadlineExceeded) {
1✔
250
      stream = clientStreamProvider.newStream(method, callOptions, headers, context);
1✔
251
    } else {
252
      ClientStreamTracer[] tracers =
1✔
253
          GrpcUtil.getClientStreamTracers(callOptions, headers, 0,
1✔
254
              false, false);
255
      String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
1✔
256
      Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
257
      String description = String.format(
1✔
258
          "ClientCall started after %s deadline was exceeded %.9f seconds ago. "
259
              + "Name resolution delay %.9f seconds.", deadlineName,
260
          cancellationHandler.remainingNanos / NANO_TO_SECS,
1✔
261
          nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
1✔
262
      stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
1✔
263
    }
264

265
    if (callExecutorIsDirect) {
1✔
266
      stream.optimizeForDirectExecutor();
1✔
267
    }
268
    if (callOptions.getAuthority() != null) {
1✔
269
      stream.setAuthority(callOptions.getAuthority());
1✔
270
    }
271
    if (callOptions.getMaxInboundMessageSize() != null) {
1✔
272
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
1✔
273
    }
274
    if (callOptions.getMaxOutboundMessageSize() != null) {
1✔
275
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
1✔
276
    }
277
    if (effectiveDeadline != null) {
1✔
278
      stream.setDeadline(effectiveDeadline);
1✔
279
    }
280
    stream.setCompressor(compressor);
1✔
281
    if (fullStreamDecompression) {
1✔
282
      stream.setFullStreamDecompression(fullStreamDecompression);
×
283
    }
284
    stream.setDecompressorRegistry(decompressorRegistry);
1✔
285
    channelCallsTracer.reportCallStarted();
1✔
286
    stream.start(new ClientStreamListenerImpl(observer));
1✔
287

288
    // Delay any sources of cancellation after start(), because most of the transports are broken if
289
    // they receive cancel before start. Issue #1343 has more details
290

291
    // Propagate later Context cancellation to the remote side.
292
    cancellationHandler.setUp();
1✔
293
  }
1✔
294

295
  private void applyMethodConfig() {
296
    MethodInfo info = callOptions.getOption(MethodInfo.KEY);
1✔
297
    if (info == null) {
1✔
298
      return;
1✔
299
    }
300
    if (info.timeoutNanos != null) {
1✔
301
      Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
302
      Deadline existingDeadline = callOptions.getDeadline();
1✔
303
      // If the new deadline is sooner than the existing deadline, swap them.
304
      if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) {
1✔
305
        callOptions = callOptions.withDeadline(newDeadline);
1✔
306
      }
307
    }
308
    if (info.waitForReady != null) {
1✔
309
      callOptions =
1✔
310
          info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady();
1✔
311
    }
312
    if (info.maxInboundMessageSize != null) {
1✔
313
      Integer existingLimit = callOptions.getMaxInboundMessageSize();
×
314
      if (existingLimit != null) {
×
315
        callOptions =
×
316
            callOptions.withMaxInboundMessageSize(
×
317
                Math.min(existingLimit, info.maxInboundMessageSize));
×
318
      } else {
319
        callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize);
×
320
      }
321
    }
322
    if (info.maxOutboundMessageSize != null) {
1✔
323
      Integer existingLimit = callOptions.getMaxOutboundMessageSize();
×
324
      if (existingLimit != null) {
×
325
        callOptions =
×
326
            callOptions.withMaxOutboundMessageSize(
×
327
                Math.min(existingLimit, info.maxOutboundMessageSize));
×
328
      } else {
329
        callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
×
330
      }
331
    }
332
  }
1✔
333

334
  private final class CancellationHandler implements Runnable, CancellationListener {
335
    private final boolean contextIsDeadlineSource;
336
    private final boolean hasDeadline;
337
    private final long remainingNanos;
338
    private volatile ScheduledFuture<?> deadlineCancellationFuture;
339
    private volatile boolean tearDownCalled;
340

341
    CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
1✔
342
      this.contextIsDeadlineSource = contextIsDeadlineSource;
1✔
343
      if (deadline == null) {
1✔
344
        hasDeadline = false;
1✔
345
        remainingNanos = 0;
1✔
346
      } else {
347
        hasDeadline = true;
1✔
348
        remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
349
      }
350
    }
1✔
351

352
    void setUp() {
353
      if (tearDownCalled) {
1✔
354
        return;
1✔
355
      }
356
      if (hasDeadline
1✔
357
          // If the context has the effective deadline, we don't need to schedule an extra task.
358
          && !contextIsDeadlineSource
359
          // If the channel has been terminated, we don't need to schedule an extra task.
360
          && deadlineCancellationExecutor != null) {
1✔
361
        deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
1✔
362
            new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
363
      }
364
      context.addListener(this, directExecutor());
1✔
365
      if (tearDownCalled) {
1✔
366
        // Race detected! Re-run to make sure the future is cancelled and context listener removed
367
        tearDown();
×
368
      }
369
    }
1✔
370

371
    // May be called multiple times, and race with setUp()
372
    void tearDown() {
373
      tearDownCalled = true;
1✔
374
      ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
1✔
375
      if (deadlineCancellationFuture != null) {
1✔
376
        deadlineCancellationFuture.cancel(false);
1✔
377
      }
378
      context.removeListener(this);
1✔
379
    }
1✔
380

381
    @Override
382
    public void cancelled(Context context) {
383
      if (hasDeadline && contextIsDeadlineSource
1✔
384
          && context.cancellationCause() instanceof TimeoutException) {
1✔
385
        stream.cancel(formatDeadlineExceededStatus());
1✔
386
        return;
1✔
387
      }
388
      stream.cancel(statusFromCancelled(context));
1✔
389
    }
1✔
390

391
    @Override
392
    public void run() {
393
      stream.cancel(formatDeadlineExceededStatus());
1✔
394
    }
1✔
395

396
    Status formatDeadlineExceededStatus() {
397
      // DelayedStream.cancel() is safe to call from a thread that is different from where the
398
      // stream is created.
399
      long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
400
      long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
401

402
      StringBuilder buf = new StringBuilder();
1✔
403
      buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
1✔
404
      buf.append(" deadline exceeded after ");
1✔
405
      if (remainingNanos < 0) {
1✔
406
        buf.append('-');
1✔
407
      }
408
      buf.append(seconds);
1✔
409
      buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
410
      buf.append("s. ");
1✔
411
      Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
412
      buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
1✔
413
          nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
1✔
414
      if (stream != null) {
1✔
415
        InsightBuilder insight = new InsightBuilder();
1✔
416
        stream.appendTimeoutInsight(insight);
1✔
417
        buf.append(" ");
1✔
418
        buf.append(insight);
1✔
419
      }
420
      return DEADLINE_EXCEEDED.withDescription(buf.toString());
1✔
421
    }
422
  }
423

424
  @Nullable
425
  private Deadline effectiveDeadline() {
426
    // Call options and context are immutable, so we don't need to cache the deadline.
427
    return min(callOptions.getDeadline(), context.getDeadline());
1✔
428
  }
429

430
  @Nullable
431
  private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
432
    if (deadline0 == null) {
1✔
433
      return deadline1;
1✔
434
    }
435
    if (deadline1 == null) {
1✔
436
      return deadline0;
1✔
437
    }
438
    return deadline0.minimum(deadline1);
1✔
439
  }
440

441
  @Override
442
  public void request(int numMessages) {
443
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
1✔
444
      PerfMark.attachTag(tag);
1✔
445
      checkState(stream != null, "Not started");
1✔
446
      checkArgument(numMessages >= 0, "Number requested must be non-negative");
1✔
447
      stream.request(numMessages);
1✔
448
    }
449
  }
1✔
450

451
  @Override
452
  public void cancel(@Nullable String message, @Nullable Throwable cause) {
453
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) {
1✔
454
      PerfMark.attachTag(tag);
1✔
455
      cancelInternal(message, cause);
1✔
456
    }
457
  }
1✔
458

459
  private void cancelInternal(@Nullable String message, @Nullable Throwable cause) {
460
    if (message == null && cause == null) {
1✔
461
      cause = new CancellationException("Cancelled without a message or cause");
×
462
      log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
×
463
    }
464
    if (cancelCalled) {
1✔
465
      return;
1✔
466
    }
467
    cancelCalled = true;
1✔
468
    try {
469
      // Cancel is called in exception handling cases, so it may be the case that the
470
      // stream was never successfully created or start has never been called.
471
      if (stream != null) {
1✔
472
        Status status = Status.CANCELLED;
1✔
473
        if (message != null) {
1✔
474
          status = status.withDescription(message);
1✔
475
        } else {
476
          status = status.withDescription("Call cancelled without message");
1✔
477
        }
478
        if (cause != null) {
1✔
479
          status = status.withCause(cause);
1✔
480
        }
481
        stream.cancel(status);
1✔
482
      }
483
    } finally {
484
      // start() might not have been called
485
      if (cancellationHandler != null) {
1✔
486
        cancellationHandler.tearDown();
1✔
487
      }
488
    }
489
  }
1✔
490

491
  @Override
492
  public void halfClose() {
493
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) {
1✔
494
      PerfMark.attachTag(tag);
1✔
495
      halfCloseInternal();
1✔
496
    }
497
  }
1✔
498

499
  private void halfCloseInternal() {
500
    checkState(stream != null, "Not started");
1✔
501
    checkState(!cancelCalled, "call was cancelled");
1✔
502
    checkState(!halfCloseCalled, "call already half-closed");
1✔
503
    halfCloseCalled = true;
1✔
504
    stream.halfClose();
1✔
505
  }
1✔
506

507
  @Override
508
  public void sendMessage(ReqT message) {
509
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) {
1✔
510
      PerfMark.attachTag(tag);
1✔
511
      sendMessageInternal(message);
1✔
512
    }
513
  }
1✔
514

515
  private void sendMessageInternal(ReqT message) {
516
    checkState(stream != null, "Not started");
1✔
517
    checkState(!cancelCalled, "call was cancelled");
1✔
518
    checkState(!halfCloseCalled, "call was half-closed");
1✔
519
    try {
520
      if (stream instanceof RetriableStream) {
1✔
521
        @SuppressWarnings("unchecked")
522
        RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream;
1✔
523
        retriableStream.sendMessage(message);
1✔
524
      } else {
1✔
525
        stream.writeMessage(method.streamRequest(message));
1✔
526
      }
527
    } catch (RuntimeException e) {
1✔
528
      stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
1✔
529
      return;
1✔
530
    } catch (Error e) {
×
531
      stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
×
532
      throw e;
×
533
    }
1✔
534
    // For unary requests, we don't flush since we know that halfClose should be coming soon. This
535
    // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
536
    // possibility of broken applications forgetting to call halfClose without noticing.
537
    if (!unaryRequest) {
1✔
538
      stream.flush();
1✔
539
    }
540
  }
1✔
541

542
  @Override
543
  public void setMessageCompression(boolean enabled) {
544
    checkState(stream != null, "Not started");
1✔
545
    stream.setMessageCompression(enabled);
1✔
546
  }
1✔
547

548
  @Override
549
  public boolean isReady() {
550
    if (halfCloseCalled) {
1✔
551
      return false;
1✔
552
    }
553
    return stream.isReady();
1✔
554
  }
555

556
  @Override
557
  public Attributes getAttributes() {
558
    if (stream != null) {
1✔
559
      return stream.getAttributes();
1✔
560
    }
561
    return Attributes.EMPTY;
1✔
562
  }
563

564
  private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
565
    try {
566
      observer.onClose(status, trailers);
1✔
567
    } catch (RuntimeException ex) {
1✔
568
      log.log(Level.WARNING, "Exception thrown by onClose() in ClientCall", ex);
1✔
569
    }
1✔
570
  }
1✔
571

572
  @Override
573
  public String toString() {
574
    return MoreObjects.toStringHelper(this).add("method", method).toString();
×
575
  }
576

577
  private class ClientStreamListenerImpl implements ClientStreamListener {
578
    private final Listener<RespT> observer;
579
    private Status exceptionStatus;
580

581
    public ClientStreamListenerImpl(Listener<RespT> observer) {
1✔
582
      this.observer = checkNotNull(observer, "observer");
1✔
583
    }
1✔
584

585
    /**
586
     * Cancels call and schedules onClose() notification. May only be called from the application
587
     * thread.
588
     */
589
    private void exceptionThrown(Status status) {
590
      // Since each RPC can have its own executor, we can only call onClose() when we are sure there
591
      // will be no further callbacks. We set the status here and overwrite the onClose() details
592
      // when it arrives.
593
      exceptionStatus = status;
1✔
594
      stream.cancel(status);
1✔
595
    }
1✔
596

597
    @Override
598
    public void headersRead(final Metadata headers) {
599
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) {
1✔
600
        PerfMark.attachTag(tag);
1✔
601
        final Link link = PerfMark.linkOut();
1✔
602
        final class HeadersRead extends ContextRunnable {
603
          HeadersRead() {
1✔
604
            super(context);
1✔
605
          }
1✔
606

607
          @Override
608
          public void runInContext() {
609
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) {
1✔
610
              PerfMark.attachTag(tag);
1✔
611
              PerfMark.linkIn(link);
1✔
612
              runInternal();
1✔
613
            }
614
          }
1✔
615

616
          private void runInternal() {
617
            if (exceptionStatus != null) {
1✔
618
              return;
×
619
            }
620
            try {
621
              observer.onHeaders(headers);
1✔
622
            } catch (Throwable t) {
1✔
623
              exceptionThrown(
1✔
624
                  Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
1✔
625
            }
1✔
626
          }
1✔
627
        }
628

629
        callExecutor.execute(new HeadersRead());
1✔
630
      }
631
    }
1✔
632

633
    @Override
634
    public void messagesAvailable(final MessageProducer producer) {
635
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) {
1✔
636
        PerfMark.attachTag(tag);
1✔
637
        final Link link = PerfMark.linkOut();
1✔
638
        final class MessagesAvailable extends ContextRunnable {
639
          MessagesAvailable() {
1✔
640
            super(context);
1✔
641
          }
1✔
642

643
          @Override
644
          public void runInContext() {
645
            try (TaskCloseable ignore =
1✔
646
                     PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) {
1✔
647
              PerfMark.attachTag(tag);
1✔
648
              PerfMark.linkIn(link);
1✔
649
              runInternal();
1✔
650
            }
651
          }
1✔
652

653
          private void runInternal() {
654
            if (exceptionStatus != null) {
1✔
655
              GrpcUtil.closeQuietly(producer);
×
656
              return;
×
657
            }
658
            try {
659
              InputStream message;
660
              while ((message = producer.next()) != null) {
1✔
661
                try {
662
                  observer.onMessage(method.parseResponse(message));
1✔
663
                } catch (Throwable t) {
1✔
664
                  GrpcUtil.closeQuietly(message);
1✔
665
                  throw t;
1✔
666
                }
1✔
667
                message.close();
1✔
668
              }
669
            } catch (Throwable t) {
1✔
670
              GrpcUtil.closeQuietly(producer);
1✔
671
              exceptionThrown(
1✔
672
                  Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
1✔
673
            }
1✔
674
          }
1✔
675
        }
676

677
        callExecutor.execute(new MessagesAvailable());
1✔
678
      }
679
    }
1✔
680

681
    @Override
682
    public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
683
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) {
1✔
684
        PerfMark.attachTag(tag);
1✔
685
        closedInternal(status, rpcProgress, trailers);
1✔
686
      }
687
    }
1✔
688

689
    private void closedInternal(
690
        Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
691
      Deadline deadline = effectiveDeadline();
1✔
692
      if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
1✔
693
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
694
        // description. Since our timer may be delayed in firing, we double-check the deadline and
695
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
696
        if (deadline.isExpired()) {
1✔
697
          status = cancellationHandler.formatDeadlineExceededStatus();
1✔
698
          // Replace trailers to prevent mixing sources of status and trailers.
699
          trailers = new Metadata();
1✔
700
        }
701
      }
702
      final Status savedStatus = status;
1✔
703
      final Metadata savedTrailers = trailers;
1✔
704
      final Link link = PerfMark.linkOut();
1✔
705
      final class StreamClosed extends ContextRunnable {
706
        StreamClosed() {
1✔
707
          super(context);
1✔
708
        }
1✔
709

710
        @Override
711
        public void runInContext() {
712
          try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) {
1✔
713
            PerfMark.attachTag(tag);
1✔
714
            PerfMark.linkIn(link);
1✔
715
            runInternal();
1✔
716
          }
717
        }
1✔
718

719
        private void runInternal() {
720
          cancellationHandler.tearDown();
1✔
721
          Status status = savedStatus;
1✔
722
          Metadata trailers = savedTrailers;
1✔
723
          if (exceptionStatus != null) {
1✔
724
            // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
725
            // However the cancel is racy and this closed() may have already been queued when the
726
            // cancellation occurred. Since other calls like onMessage() will throw away data if
727
            // exceptionStatus != null, it is semantically essential that we _not_ use a status
728
            // provided by the server.
729
            status = exceptionStatus;
1✔
730
            // Replace trailers to prevent mixing sources of status and trailers.
731
            trailers = new Metadata();
1✔
732
          }
733
          try {
734
            closeObserver(observer, status, trailers);
1✔
735
          } finally {
736
            channelCallsTracer.reportCallEnded(status.isOk());
1✔
737
          }
738
        }
1✔
739
      }
740

741
      callExecutor.execute(new StreamClosed());
1✔
742
    }
1✔
743

744
    @Override
745
    public void onReady() {
746
      if (method.getType().clientSendsOneMessage()) {
1✔
747
        return;
1✔
748
      }
749
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) {
1✔
750
        PerfMark.attachTag(tag);
1✔
751
        final Link link = PerfMark.linkOut();
1✔
752

753
        final class StreamOnReady extends ContextRunnable {
754
          StreamOnReady() {
1✔
755
            super(context);
1✔
756
          }
1✔
757

758
          @Override
759
          public void runInContext() {
760
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) {
1✔
761
              PerfMark.attachTag(tag);
1✔
762
              PerfMark.linkIn(link);
1✔
763
              runInternal();
1✔
764
            }
765
          }
1✔
766

767
          private void runInternal() {
768
            if (exceptionStatus != null) {
1✔
769
              return;
×
770
            }
771
            try {
772
              observer.onReady();
1✔
773
            } catch (Throwable t) {
1✔
774
              exceptionThrown(
1✔
775
                  Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
1✔
776
            }
1✔
777
          }
1✔
778
        }
779

780
        callExecutor.execute(new StreamOnReady());
1✔
781
      }
782
    }
1✔
783
  }
784
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc