• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19107

19 Mar 2024 05:06PM UTC coverage: 88.279% (-0.04%) from 88.315%
#19107

push

github

web-flow
Revert "core: Provide DEADLINE_EXCEEDED insights for context deadline" (#11024)

This reverts commit 0e31ac930.

31160 of 35297 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.89
/../core/src/main/java/io/grpc/internal/ClientCallImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
24
import static io.grpc.Contexts.statusFromCancelled;
25
import static io.grpc.Status.DEADLINE_EXCEEDED;
26
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
27
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
28
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
29
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
30
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
31
import static java.lang.Math.max;
32

33
import com.google.common.annotations.VisibleForTesting;
34
import com.google.common.base.MoreObjects;
35
import io.grpc.Attributes;
36
import io.grpc.CallOptions;
37
import io.grpc.ClientCall;
38
import io.grpc.ClientStreamTracer;
39
import io.grpc.Codec;
40
import io.grpc.Compressor;
41
import io.grpc.CompressorRegistry;
42
import io.grpc.Context;
43
import io.grpc.Context.CancellationListener;
44
import io.grpc.Deadline;
45
import io.grpc.DecompressorRegistry;
46
import io.grpc.InternalConfigSelector;
47
import io.grpc.InternalDecompressorRegistry;
48
import io.grpc.Metadata;
49
import io.grpc.MethodDescriptor;
50
import io.grpc.MethodDescriptor.MethodType;
51
import io.grpc.Status;
52
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
53
import io.perfmark.Link;
54
import io.perfmark.PerfMark;
55
import io.perfmark.Tag;
56
import io.perfmark.TaskCloseable;
57
import java.io.InputStream;
58
import java.nio.charset.Charset;
59
import java.util.Locale;
60
import java.util.concurrent.CancellationException;
61
import java.util.concurrent.Executor;
62
import java.util.concurrent.ScheduledExecutorService;
63
import java.util.concurrent.ScheduledFuture;
64
import java.util.concurrent.TimeUnit;
65
import java.util.logging.Level;
66
import java.util.logging.Logger;
67
import javax.annotation.Nullable;
68

69
/**
70
 * Implementation of {@link ClientCall}.
71
 */
72
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
73

74
  private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
1✔
75
  private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
1✔
76
      = "gzip".getBytes(Charset.forName("US-ASCII"));
1✔
77
  private static final double NANO_TO_SECS = 1.0 * TimeUnit.SECONDS.toNanos(1);
1✔
78

79
  private final MethodDescriptor<ReqT, RespT> method;
80
  private final Tag tag;
81
  private final Executor callExecutor;
82
  private final boolean callExecutorIsDirect;
83
  private final CallTracer channelCallsTracer;
84
  private final Context context;
85
  private volatile ScheduledFuture<?> deadlineCancellationFuture;
86
  private final boolean unaryRequest;
87
  private CallOptions callOptions;
88
  private ClientStream stream;
89
  private volatile boolean cancelListenersShouldBeRemoved;
90
  private boolean cancelCalled;
91
  private boolean halfCloseCalled;
92
  private final ClientStreamProvider clientStreamProvider;
93
  private final ContextCancellationListener cancellationListener =
1✔
94
      new ContextCancellationListener();
95
  private final ScheduledExecutorService deadlineCancellationExecutor;
96
  private boolean fullStreamDecompression;
97
  private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
98
  private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
1✔
99

100
  ClientCallImpl(
101
      MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
102
      ClientStreamProvider clientStreamProvider,
103
      ScheduledExecutorService deadlineCancellationExecutor,
104
      CallTracer channelCallsTracer,
105
      // TODO(zdapeng): remove this arg
106
      @Nullable InternalConfigSelector configSelector) {
1✔
107
    this.method = method;
1✔
108
    // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
109
    this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
1✔
110
    // If we know that the executor is a direct executor, we don't need to wrap it with a
111
    // SerializingExecutor. This is purely for performance reasons.
112
    // See https://github.com/grpc/grpc-java/issues/368
113
    if (executor == directExecutor()) {
1✔
114
      this.callExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
115
      callExecutorIsDirect = true;
1✔
116
    } else {
117
      this.callExecutor = new SerializingExecutor(executor);
1✔
118
      callExecutorIsDirect = false;
1✔
119
    }
120
    this.channelCallsTracer = channelCallsTracer;
1✔
121
    // Propagate the context from the thread which initiated the call to all callbacks.
122
    this.context = Context.current();
1✔
123
    this.unaryRequest = method.getType() == MethodType.UNARY
1✔
124
        || method.getType() == MethodType.SERVER_STREAMING;
1✔
125
    this.callOptions = callOptions;
1✔
126
    this.clientStreamProvider = clientStreamProvider;
1✔
127
    this.deadlineCancellationExecutor = deadlineCancellationExecutor;
1✔
128
    PerfMark.event("ClientCall.<init>", tag);
1✔
129
  }
1✔
130

131
  private final class ContextCancellationListener implements CancellationListener {
1✔
132
    @Override
133
    public void cancelled(Context context) {
134
      stream.cancel(statusFromCancelled(context));
1✔
135
    }
1✔
136
  }
137

138
  /**
139
   * Provider of {@link ClientStream}s.
140
   */
141
  interface ClientStreamProvider {
142
    ClientStream newStream(
143
        MethodDescriptor<?, ?> method,
144
        CallOptions callOptions,
145
        Metadata headers,
146
        Context context);
147
  }
148

149
  ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
150
    this.fullStreamDecompression = fullStreamDecompression;
1✔
151
    return this;
1✔
152
  }
153

154
  ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
155
    this.decompressorRegistry = decompressorRegistry;
1✔
156
    return this;
1✔
157
  }
158

159
  ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
160
    this.compressorRegistry = compressorRegistry;
1✔
161
    return this;
1✔
162
  }
163

164
  @VisibleForTesting
165
  static void prepareHeaders(
166
      Metadata headers,
167
      DecompressorRegistry decompressorRegistry,
168
      Compressor compressor,
169
      boolean fullStreamDecompression) {
170
    headers.discardAll(CONTENT_LENGTH_KEY);
1✔
171
    headers.discardAll(MESSAGE_ENCODING_KEY);
1✔
172
    if (compressor != Codec.Identity.NONE) {
1✔
173
      headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
1✔
174
    }
175

176
    headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
177
    byte[] advertisedEncodings =
1✔
178
        InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
1✔
179
    if (advertisedEncodings.length != 0) {
1✔
180
      headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
1✔
181
    }
182

183
    headers.discardAll(CONTENT_ENCODING_KEY);
1✔
184
    headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
1✔
185
    if (fullStreamDecompression) {
1✔
186
      headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
1✔
187
    }
188
  }
1✔
189

190
  @Override
191
  public void start(Listener<RespT> observer, Metadata headers) {
192
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) {
1✔
193
      PerfMark.attachTag(tag);
1✔
194
      startInternal(observer, headers);
1✔
195
    }
196
  }
1✔
197

198
  private void startInternal(Listener<RespT> observer, Metadata headers) {
199
    checkState(stream == null, "Already started");
1✔
200
    checkState(!cancelCalled, "call was cancelled");
1✔
201
    checkNotNull(observer, "observer");
1✔
202
    checkNotNull(headers, "headers");
1✔
203

204
    if (context.isCancelled()) {
1✔
205
      // Context is already cancelled so no need to create a real stream, just notify the observer
206
      // of cancellation via callback on the executor
207
      stream = NoopClientStream.INSTANCE;
1✔
208
      final Listener<RespT> finalObserver = observer;
1✔
209
      class ClosedByContext extends ContextRunnable {
210
        ClosedByContext() {
1✔
211
          super(context);
1✔
212
        }
1✔
213

214
        @Override
215
        public void runInContext() {
216
          closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
1✔
217
        }
1✔
218
      }
219

220
      callExecutor.execute(new ClosedByContext());
1✔
221
      return;
1✔
222
    }
223
    applyMethodConfig();
1✔
224
    final String compressorName = callOptions.getCompressor();
1✔
225
    Compressor compressor;
226
    if (compressorName != null) {
1✔
227
      compressor = compressorRegistry.lookupCompressor(compressorName);
1✔
228
      if (compressor == null) {
1✔
229
        stream = NoopClientStream.INSTANCE;
×
230
        final Listener<RespT> finalObserver = observer;
×
231
        class ClosedByNotFoundCompressor extends ContextRunnable {
232
          ClosedByNotFoundCompressor() {
×
233
            super(context);
×
234
          }
×
235

236
          @Override
237
          public void runInContext() {
238
            closeObserver(
×
239
                finalObserver,
240
                Status.INTERNAL.withDescription(
×
241
                    String.format("Unable to find compressor by name %s", compressorName)),
×
242
                new Metadata());
243
          }
×
244
        }
245

246
        callExecutor.execute(new ClosedByNotFoundCompressor());
×
247
        return;
×
248
      }
249
    } else {
250
      compressor = Codec.Identity.NONE;
1✔
251
    }
252
    prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
1✔
253

254
    Deadline effectiveDeadline = effectiveDeadline();
1✔
255
    boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
1✔
256
    if (!deadlineExceeded) {
1✔
257
      logIfContextNarrowedTimeout(
1✔
258
          effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
1✔
259
      stream = clientStreamProvider.newStream(method, callOptions, headers, context);
1✔
260
    } else {
261
      ClientStreamTracer[] tracers =
1✔
262
          GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
1✔
263
      String deadlineName =
264
          isFirstMin(callOptions.getDeadline(), context.getDeadline()) ? "CallOptions" : "Context";
1✔
265
      Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
266
      String description = String.format(
1✔
267
          "ClientCall started after %s deadline was exceeded %.9f seconds ago. "
268
              + "Name resolution delay %.9f seconds.", deadlineName,
269
          effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS) / NANO_TO_SECS,
1✔
270
          nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
1✔
271
      stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
1✔
272
    }
273

274
    if (callExecutorIsDirect) {
1✔
275
      stream.optimizeForDirectExecutor();
1✔
276
    }
277
    if (callOptions.getAuthority() != null) {
1✔
278
      stream.setAuthority(callOptions.getAuthority());
1✔
279
    }
280
    if (callOptions.getMaxInboundMessageSize() != null) {
1✔
281
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
1✔
282
    }
283
    if (callOptions.getMaxOutboundMessageSize() != null) {
1✔
284
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
1✔
285
    }
286
    if (effectiveDeadline != null) {
1✔
287
      stream.setDeadline(effectiveDeadline);
1✔
288
    }
289
    stream.setCompressor(compressor);
1✔
290
    if (fullStreamDecompression) {
1✔
291
      stream.setFullStreamDecompression(fullStreamDecompression);
×
292
    }
293
    stream.setDecompressorRegistry(decompressorRegistry);
1✔
294
    channelCallsTracer.reportCallStarted();
1✔
295
    stream.start(new ClientStreamListenerImpl(observer));
1✔
296

297
    // Delay any sources of cancellation after start(), because most of the transports are broken if
298
    // they receive cancel before start. Issue #1343 has more details
299

300
    // Propagate later Context cancellation to the remote side.
301
    context.addListener(cancellationListener, directExecutor());
1✔
302
    if (effectiveDeadline != null
1✔
303
        // If the context has the effective deadline, we don't need to schedule an extra task.
304
        && !effectiveDeadline.equals(context.getDeadline())
1✔
305
        // If the channel has been terminated, we don't need to schedule an extra task.
306
        && deadlineCancellationExecutor != null) {
307
      deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
1✔
308
    }
309
    if (cancelListenersShouldBeRemoved) {
1✔
310
      // Race detected! ClientStreamListener.closed may have been called before
311
      // deadlineCancellationFuture was set / context listener added, thereby preventing the future
312
      // and listener from being cancelled. Go ahead and cancel again, just to be sure it
313
      // was cancelled.
314
      removeContextListenerAndCancelDeadlineFuture();
1✔
315
    }
316
  }
1✔
317

318
  private void applyMethodConfig() {
319
    MethodInfo info = callOptions.getOption(MethodInfo.KEY);
1✔
320
    if (info == null) {
1✔
321
      return;
1✔
322
    }
323
    if (info.timeoutNanos != null) {
1✔
324
      Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
325
      Deadline existingDeadline = callOptions.getDeadline();
1✔
326
      // If the new deadline is sooner than the existing deadline, swap them.
327
      if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) {
1✔
328
        callOptions = callOptions.withDeadline(newDeadline);
1✔
329
      }
330
    }
331
    if (info.waitForReady != null) {
1✔
332
      callOptions =
1✔
333
          info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady();
1✔
334
    }
335
    if (info.maxInboundMessageSize != null) {
1✔
336
      Integer existingLimit = callOptions.getMaxInboundMessageSize();
×
337
      if (existingLimit != null) {
×
338
        callOptions =
×
339
            callOptions.withMaxInboundMessageSize(
×
340
                Math.min(existingLimit, info.maxInboundMessageSize));
×
341
      } else {
342
        callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize);
×
343
      }
344
    }
345
    if (info.maxOutboundMessageSize != null) {
1✔
346
      Integer existingLimit = callOptions.getMaxOutboundMessageSize();
×
347
      if (existingLimit != null) {
×
348
        callOptions =
×
349
            callOptions.withMaxOutboundMessageSize(
×
350
                Math.min(existingLimit, info.maxOutboundMessageSize));
×
351
      } else {
352
        callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
×
353
      }
354
    }
355
  }
1✔
356

357
  private static void logIfContextNarrowedTimeout(
358
      Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
359
      @Nullable Deadline callDeadline) {
360
    if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
1✔
361
        || !effectiveDeadline.equals(outerCallDeadline)) {
×
362
      return;
1✔
363
    }
364

365
    long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
×
366
    StringBuilder builder = new StringBuilder(String.format(
×
367
        Locale.US,
368
        "Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
×
369
    if (callDeadline == null) {
×
370
      builder.append(" Explicit call timeout was not set.");
×
371
    } else {
372
      long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
×
373
      builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
×
374
    }
375

376
    log.fine(builder.toString());
×
377
  }
×
378

379
  private void removeContextListenerAndCancelDeadlineFuture() {
380
    context.removeListener(cancellationListener);
1✔
381
    ScheduledFuture<?> f = deadlineCancellationFuture;
1✔
382
    if (f != null) {
1✔
383
      f.cancel(false);
1✔
384
    }
385
  }
1✔
386

387
  private class DeadlineTimer implements Runnable {
388
    private final long remainingNanos;
389

390
    DeadlineTimer(long remainingNanos) {
1✔
391
      this.remainingNanos = remainingNanos;
1✔
392
    }
1✔
393

394
    @Override
395
    public void run() {
396
      InsightBuilder insight = new InsightBuilder();
1✔
397
      stream.appendTimeoutInsight(insight);
1✔
398
      // DelayedStream.cancel() is safe to call from a thread that is different from where the
399
      // stream is created.
400
      long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
401
      long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
402

403
      StringBuilder buf = new StringBuilder();
1✔
404
      buf.append("deadline exceeded after ");
1✔
405
      if (remainingNanos < 0) {
1✔
406
        buf.append('-');
1✔
407
      }
408
      buf.append(seconds);
1✔
409
      buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
410
      buf.append("s. ");
1✔
411
      Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
412
      buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds. ",
1✔
413
          nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
1✔
414
      buf.append(insight);
1✔
415
      stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
1✔
416
    }
1✔
417
  }
418

419
  private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
420
    long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
421
    return deadlineCancellationExecutor.schedule(
1✔
422
        new LogExceptionRunnable(
423
            new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
424
  }
425

426
  @Nullable
427
  private Deadline effectiveDeadline() {
428
    // Call options and context are immutable, so we don't need to cache the deadline.
429
    return min(callOptions.getDeadline(), context.getDeadline());
1✔
430
  }
431

432
  @Nullable
433
  private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
434
    if (deadline0 == null) {
1✔
435
      return deadline1;
1✔
436
    }
437
    if (deadline1 == null) {
1✔
438
      return deadline0;
1✔
439
    }
440
    return deadline0.minimum(deadline1);
1✔
441
  }
442

443
  private static boolean isFirstMin(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
444
    if (deadline0 == null) {
1✔
445
      return false;
×
446
    }
447
    if (deadline1 == null) {
1✔
448
      return true;
1✔
449
    }
450
    return deadline0.isBefore(deadline1);
×
451
  }
452

453
  @Override
454
  public void request(int numMessages) {
455
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
1✔
456
      PerfMark.attachTag(tag);
1✔
457
      checkState(stream != null, "Not started");
1✔
458
      checkArgument(numMessages >= 0, "Number requested must be non-negative");
1✔
459
      stream.request(numMessages);
1✔
460
    }
461
  }
1✔
462

463
  @Override
464
  public void cancel(@Nullable String message, @Nullable Throwable cause) {
465
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) {
1✔
466
      PerfMark.attachTag(tag);
1✔
467
      cancelInternal(message, cause);
1✔
468
    }
469
  }
1✔
470

471
  private void cancelInternal(@Nullable String message, @Nullable Throwable cause) {
472
    if (message == null && cause == null) {
1✔
473
      cause = new CancellationException("Cancelled without a message or cause");
×
474
      log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
×
475
    }
476
    if (cancelCalled) {
1✔
477
      return;
1✔
478
    }
479
    cancelCalled = true;
1✔
480
    try {
481
      // Cancel is called in exception handling cases, so it may be the case that the
482
      // stream was never successfully created or start has never been called.
483
      if (stream != null) {
1✔
484
        Status status = Status.CANCELLED;
1✔
485
        if (message != null) {
1✔
486
          status = status.withDescription(message);
1✔
487
        } else {
488
          status = status.withDescription("Call cancelled without message");
1✔
489
        }
490
        if (cause != null) {
1✔
491
          status = status.withCause(cause);
1✔
492
        }
493
        stream.cancel(status);
1✔
494
      }
495
    } finally {
496
      removeContextListenerAndCancelDeadlineFuture();
1✔
497
    }
498
  }
1✔
499

500
  @Override
501
  public void halfClose() {
502
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) {
1✔
503
      PerfMark.attachTag(tag);
1✔
504
      halfCloseInternal();
1✔
505
    }
506
  }
1✔
507

508
  private void halfCloseInternal() {
509
    checkState(stream != null, "Not started");
1✔
510
    checkState(!cancelCalled, "call was cancelled");
1✔
511
    checkState(!halfCloseCalled, "call already half-closed");
1✔
512
    halfCloseCalled = true;
1✔
513
    stream.halfClose();
1✔
514
  }
1✔
515

516
  @Override
517
  public void sendMessage(ReqT message) {
518
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) {
1✔
519
      PerfMark.attachTag(tag);
1✔
520
      sendMessageInternal(message);
1✔
521
    }
522
  }
1✔
523

524
  private void sendMessageInternal(ReqT message) {
525
    checkState(stream != null, "Not started");
1✔
526
    checkState(!cancelCalled, "call was cancelled");
1✔
527
    checkState(!halfCloseCalled, "call was half-closed");
1✔
528
    try {
529
      if (stream instanceof RetriableStream) {
1✔
530
        @SuppressWarnings("unchecked")
531
        RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream;
1✔
532
        retriableStream.sendMessage(message);
1✔
533
      } else {
1✔
534
        stream.writeMessage(method.streamRequest(message));
1✔
535
      }
536
    } catch (RuntimeException e) {
1✔
537
      stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
1✔
538
      return;
1✔
539
    } catch (Error e) {
×
540
      stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
×
541
      throw e;
×
542
    }
1✔
543
    // For unary requests, we don't flush since we know that halfClose should be coming soon. This
544
    // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
545
    // possibility of broken applications forgetting to call halfClose without noticing.
546
    if (!unaryRequest) {
1✔
547
      stream.flush();
1✔
548
    }
549
  }
1✔
550

551
  @Override
552
  public void setMessageCompression(boolean enabled) {
553
    checkState(stream != null, "Not started");
1✔
554
    stream.setMessageCompression(enabled);
1✔
555
  }
1✔
556

557
  @Override
558
  public boolean isReady() {
559
    if (halfCloseCalled) {
1✔
560
      return false;
1✔
561
    }
562
    return stream.isReady();
1✔
563
  }
564

565
  @Override
566
  public Attributes getAttributes() {
567
    if (stream != null) {
1✔
568
      return stream.getAttributes();
1✔
569
    }
570
    return Attributes.EMPTY;
1✔
571
  }
572

573
  private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
574
    observer.onClose(status, trailers);
1✔
575
  }
1✔
576

577
  @Override
578
  public String toString() {
579
    return MoreObjects.toStringHelper(this).add("method", method).toString();
×
580
  }
581

582
  private class ClientStreamListenerImpl implements ClientStreamListener {
583
    private final Listener<RespT> observer;
584
    private Status exceptionStatus;
585

586
    public ClientStreamListenerImpl(Listener<RespT> observer) {
1✔
587
      this.observer = checkNotNull(observer, "observer");
1✔
588
    }
1✔
589

590
    /**
591
     * Cancels call and schedules onClose() notification. May only be called from the application
592
     * thread.
593
     */
594
    private void exceptionThrown(Status status) {
595
      // Since each RPC can have its own executor, we can only call onClose() when we are sure there
596
      // will be no further callbacks. We set the status here and overwrite the onClose() details
597
      // when it arrives.
598
      exceptionStatus = status;
1✔
599
      stream.cancel(status);
1✔
600
    }
1✔
601

602
    @Override
603
    public void headersRead(final Metadata headers) {
604
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) {
1✔
605
        PerfMark.attachTag(tag);
1✔
606
        final Link link = PerfMark.linkOut();
1✔
607
        final class HeadersRead extends ContextRunnable {
608
          HeadersRead() {
1✔
609
            super(context);
1✔
610
          }
1✔
611

612
          @Override
613
          public void runInContext() {
614
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) {
1✔
615
              PerfMark.attachTag(tag);
1✔
616
              PerfMark.linkIn(link);
1✔
617
              runInternal();
1✔
618
            }
619
          }
1✔
620

621
          private void runInternal() {
622
            if (exceptionStatus != null) {
1✔
623
              return;
×
624
            }
625
            try {
626
              observer.onHeaders(headers);
1✔
627
            } catch (Throwable t) {
1✔
628
              exceptionThrown(
1✔
629
                  Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
1✔
630
            }
1✔
631
          }
1✔
632
        }
633

634
        callExecutor.execute(new HeadersRead());
1✔
635
      }
636
    }
1✔
637

638
    @Override
639
    public void messagesAvailable(final MessageProducer producer) {
640
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) {
1✔
641
        PerfMark.attachTag(tag);
1✔
642
        final Link link = PerfMark.linkOut();
1✔
643
        final class MessagesAvailable extends ContextRunnable {
644
          MessagesAvailable() {
1✔
645
            super(context);
1✔
646
          }
1✔
647

648
          @Override
649
          public void runInContext() {
650
            try (TaskCloseable ignore =
1✔
651
                     PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) {
1✔
652
              PerfMark.attachTag(tag);
1✔
653
              PerfMark.linkIn(link);
1✔
654
              runInternal();
1✔
655
            }
656
          }
1✔
657

658
          private void runInternal() {
659
            if (exceptionStatus != null) {
1✔
660
              GrpcUtil.closeQuietly(producer);
×
661
              return;
×
662
            }
663
            try {
664
              InputStream message;
665
              while ((message = producer.next()) != null) {
1✔
666
                try {
667
                  observer.onMessage(method.parseResponse(message));
1✔
668
                } catch (Throwable t) {
1✔
669
                  GrpcUtil.closeQuietly(message);
1✔
670
                  throw t;
1✔
671
                }
1✔
672
                message.close();
1✔
673
              }
674
            } catch (Throwable t) {
1✔
675
              GrpcUtil.closeQuietly(producer);
1✔
676
              exceptionThrown(
1✔
677
                  Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
1✔
678
            }
1✔
679
          }
1✔
680
        }
681

682
        callExecutor.execute(new MessagesAvailable());
1✔
683
      }
684
    }
1✔
685

686
    @Override
687
    public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
688
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) {
1✔
689
        PerfMark.attachTag(tag);
1✔
690
        closedInternal(status, rpcProgress, trailers);
1✔
691
      }
692
    }
1✔
693

694
    private void closedInternal(
695
        Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
696
      Deadline deadline = effectiveDeadline();
1✔
697
      if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
1✔
698
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
699
        // description. Since our timer may be delayed in firing, we double-check the deadline and
700
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
701
        if (deadline.isExpired()) {
1✔
702
          InsightBuilder insight = new InsightBuilder();
1✔
703
          stream.appendTimeoutInsight(insight);
1✔
704
          status = DEADLINE_EXCEEDED.augmentDescription(
1✔
705
              "ClientCall was cancelled at or after deadline. " + insight);
706
          // Replace trailers to prevent mixing sources of status and trailers.
707
          trailers = new Metadata();
1✔
708
        }
709
      }
710
      final Status savedStatus = status;
1✔
711
      final Metadata savedTrailers = trailers;
1✔
712
      final Link link = PerfMark.linkOut();
1✔
713
      final class StreamClosed extends ContextRunnable {
714
        StreamClosed() {
1✔
715
          super(context);
1✔
716
        }
1✔
717

718
        @Override
719
        public void runInContext() {
720
          try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) {
1✔
721
            PerfMark.attachTag(tag);
1✔
722
            PerfMark.linkIn(link);
1✔
723
            runInternal();
1✔
724
          }
725
        }
1✔
726

727
        private void runInternal() {
728
          Status status = savedStatus;
1✔
729
          Metadata trailers = savedTrailers;
1✔
730
          if (exceptionStatus != null) {
1✔
731
            // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
732
            // However the cancel is racy and this closed() may have already been queued when the
733
            // cancellation occurred. Since other calls like onMessage() will throw away data if
734
            // exceptionStatus != null, it is semantically essential that we _not_ use a status
735
            // provided by the server.
736
            status = exceptionStatus;
1✔
737
            // Replace trailers to prevent mixing sources of status and trailers.
738
            trailers = new Metadata();
1✔
739
          }
740
          cancelListenersShouldBeRemoved = true;
1✔
741
          try {
742
            closeObserver(observer, status, trailers);
1✔
743
          } finally {
744
            removeContextListenerAndCancelDeadlineFuture();
1✔
745
            channelCallsTracer.reportCallEnded(status.isOk());
1✔
746
          }
747
        }
1✔
748
      }
749

750
      callExecutor.execute(new StreamClosed());
1✔
751
    }
1✔
752

753
    @Override
754
    public void onReady() {
755
      if (method.getType().clientSendsOneMessage()) {
1✔
756
        return;
1✔
757
      }
758
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) {
1✔
759
        PerfMark.attachTag(tag);
1✔
760
        final Link link = PerfMark.linkOut();
1✔
761

762
        final class StreamOnReady extends ContextRunnable {
763
          StreamOnReady() {
1✔
764
            super(context);
1✔
765
          }
1✔
766

767
          @Override
768
          public void runInContext() {
769
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) {
1✔
770
              PerfMark.attachTag(tag);
1✔
771
              PerfMark.linkIn(link);
1✔
772
              runInternal();
1✔
773
            }
774
          }
1✔
775

776
          private void runInternal() {
777
            if (exceptionStatus != null) {
1✔
778
              return;
×
779
            }
780
            try {
781
              observer.onReady();
1✔
782
            } catch (Throwable t) {
1✔
783
              exceptionThrown(
1✔
784
                  Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
1✔
785
            }
1✔
786
          }
1✔
787
        }
788

789
        callExecutor.execute(new StreamOnReady());
1✔
790
      }
791
    }
1✔
792
  }
793
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc