• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19103

15 Mar 2024 05:44PM UTC coverage: 88.319% (+0.06%) from 88.259%
#19103

push

github

ejona86
core: Provide DEADLINE_EXCEEDED insights for context deadline

We provided extra details when the RPC is killed by CallOptions'
Deadline, but didn't do the same for Context.

To avoid duplicating code, things were restructured, including the
threading. There are more code flows now, but I think the
multi-threading came out more obvious and less error-prone. I didn't
change the status when the deadline is already expired, because the
text is shared with DelayedClientCall and AbstractInteropTest doesn't
distinguish between the two cases.

As seen at b/300991330

31167 of 35289 relevant lines covered (88.32%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.93
/../core/src/main/java/io/grpc/internal/ClientCallImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
24
import static io.grpc.Contexts.statusFromCancelled;
25
import static io.grpc.Status.DEADLINE_EXCEEDED;
26
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
27
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
28
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
29
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
30
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
31

32
import com.google.common.annotations.VisibleForTesting;
33
import com.google.common.base.MoreObjects;
34
import io.grpc.Attributes;
35
import io.grpc.CallOptions;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.Codec;
39
import io.grpc.Compressor;
40
import io.grpc.CompressorRegistry;
41
import io.grpc.Context;
42
import io.grpc.Context.CancellationListener;
43
import io.grpc.Deadline;
44
import io.grpc.DecompressorRegistry;
45
import io.grpc.InternalConfigSelector;
46
import io.grpc.InternalDecompressorRegistry;
47
import io.grpc.Metadata;
48
import io.grpc.MethodDescriptor;
49
import io.grpc.MethodDescriptor.MethodType;
50
import io.grpc.Status;
51
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
52
import io.perfmark.Link;
53
import io.perfmark.PerfMark;
54
import io.perfmark.Tag;
55
import io.perfmark.TaskCloseable;
56
import java.io.InputStream;
57
import java.nio.charset.Charset;
58
import java.util.Locale;
59
import java.util.concurrent.CancellationException;
60
import java.util.concurrent.Executor;
61
import java.util.concurrent.ScheduledExecutorService;
62
import java.util.concurrent.ScheduledFuture;
63
import java.util.concurrent.TimeUnit;
64
import java.util.concurrent.TimeoutException;
65
import java.util.logging.Level;
66
import java.util.logging.Logger;
67
import javax.annotation.Nullable;
68

69
/**
70
 * Implementation of {@link ClientCall}.
71
 */
72
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
73

74
  private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
1✔
75
  private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
1✔
76
      = "gzip".getBytes(Charset.forName("US-ASCII"));
1✔
77
  private static final double NANO_TO_SECS = 1.0 * TimeUnit.SECONDS.toNanos(1);
1✔
78

79
  private final MethodDescriptor<ReqT, RespT> method;
80
  private final Tag tag;
81
  private final Executor callExecutor;
82
  private final boolean callExecutorIsDirect;
83
  private final CallTracer channelCallsTracer;
84
  private final Context context;
85
  private CancellationHandler cancellationHandler;
86
  private final boolean unaryRequest;
87
  private CallOptions callOptions;
88
  private ClientStream stream;
89
  private boolean cancelCalled;
90
  private boolean halfCloseCalled;
91
  private final ClientStreamProvider clientStreamProvider;
92
  private final ScheduledExecutorService deadlineCancellationExecutor;
93
  private boolean fullStreamDecompression;
94
  private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
95
  private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
1✔
96

97
  ClientCallImpl(
98
      MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
99
      ClientStreamProvider clientStreamProvider,
100
      ScheduledExecutorService deadlineCancellationExecutor,
101
      CallTracer channelCallsTracer,
102
      // TODO(zdapeng): remove this arg
103
      @Nullable InternalConfigSelector configSelector) {
1✔
104
    this.method = method;
1✔
105
    // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
106
    this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
1✔
107
    // If we know that the executor is a direct executor, we don't need to wrap it with a
108
    // SerializingExecutor. This is purely for performance reasons.
109
    // See https://github.com/grpc/grpc-java/issues/368
110
    if (executor == directExecutor()) {
1✔
111
      this.callExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
112
      callExecutorIsDirect = true;
1✔
113
    } else {
114
      this.callExecutor = new SerializingExecutor(executor);
1✔
115
      callExecutorIsDirect = false;
1✔
116
    }
117
    this.channelCallsTracer = channelCallsTracer;
1✔
118
    // Propagate the context from the thread which initiated the call to all callbacks.
119
    this.context = Context.current();
1✔
120
    this.unaryRequest = method.getType() == MethodType.UNARY
1✔
121
        || method.getType() == MethodType.SERVER_STREAMING;
1✔
122
    this.callOptions = callOptions;
1✔
123
    this.clientStreamProvider = clientStreamProvider;
1✔
124
    this.deadlineCancellationExecutor = deadlineCancellationExecutor;
1✔
125
    PerfMark.event("ClientCall.<init>", tag);
1✔
126
  }
1✔
127

128
  /**
129
   * Provider of {@link ClientStream}s.
130
   */
131
  interface ClientStreamProvider {
132
    ClientStream newStream(
133
        MethodDescriptor<?, ?> method,
134
        CallOptions callOptions,
135
        Metadata headers,
136
        Context context);
137
  }
138

139
  ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
140
    this.fullStreamDecompression = fullStreamDecompression;
1✔
141
    return this;
1✔
142
  }
143

144
  ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
145
    this.decompressorRegistry = decompressorRegistry;
1✔
146
    return this;
1✔
147
  }
148

149
  ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
150
    this.compressorRegistry = compressorRegistry;
1✔
151
    return this;
1✔
152
  }
153

154
  @VisibleForTesting
155
  static void prepareHeaders(
156
      Metadata headers,
157
      DecompressorRegistry decompressorRegistry,
158
      Compressor compressor,
159
      boolean fullStreamDecompression) {
160
    headers.discardAll(CONTENT_LENGTH_KEY);
1✔
161
    headers.discardAll(MESSAGE_ENCODING_KEY);
1✔
162
    if (compressor != Codec.Identity.NONE) {
1✔
163
      headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
1✔
164
    }
165

166
    headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
167
    byte[] advertisedEncodings =
1✔
168
        InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
1✔
169
    if (advertisedEncodings.length != 0) {
1✔
170
      headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
1✔
171
    }
172

173
    headers.discardAll(CONTENT_ENCODING_KEY);
1✔
174
    headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
1✔
175
    if (fullStreamDecompression) {
1✔
176
      headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
1✔
177
    }
178
  }
1✔
179

180
  @Override
181
  public void start(Listener<RespT> observer, Metadata headers) {
182
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) {
1✔
183
      PerfMark.attachTag(tag);
1✔
184
      startInternal(observer, headers);
1✔
185
    }
186
  }
1✔
187

188
  private void startInternal(Listener<RespT> observer, Metadata headers) {
189
    checkState(stream == null, "Already started");
1✔
190
    checkState(!cancelCalled, "call was cancelled");
1✔
191
    checkNotNull(observer, "observer");
1✔
192
    checkNotNull(headers, "headers");
1✔
193

194
    if (context.isCancelled()) {
1✔
195
      // Context is already cancelled so no need to create a real stream, just notify the observer
196
      // of cancellation via callback on the executor
197
      stream = NoopClientStream.INSTANCE;
1✔
198
      final Listener<RespT> finalObserver = observer;
1✔
199
      class ClosedByContext extends ContextRunnable {
200
        ClosedByContext() {
1✔
201
          super(context);
1✔
202
        }
1✔
203

204
        @Override
205
        public void runInContext() {
206
          closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
1✔
207
        }
1✔
208
      }
209

210
      callExecutor.execute(new ClosedByContext());
1✔
211
      return;
1✔
212
    }
213
    applyMethodConfig();
1✔
214
    final String compressorName = callOptions.getCompressor();
1✔
215
    Compressor compressor;
216
    if (compressorName != null) {
1✔
217
      compressor = compressorRegistry.lookupCompressor(compressorName);
1✔
218
      if (compressor == null) {
1✔
219
        stream = NoopClientStream.INSTANCE;
×
220
        final Listener<RespT> finalObserver = observer;
×
221
        class ClosedByNotFoundCompressor extends ContextRunnable {
222
          ClosedByNotFoundCompressor() {
×
223
            super(context);
×
224
          }
×
225

226
          @Override
227
          public void runInContext() {
228
            closeObserver(
×
229
                finalObserver,
230
                Status.INTERNAL.withDescription(
×
231
                    String.format("Unable to find compressor by name %s", compressorName)),
×
232
                new Metadata());
233
          }
×
234
        }
235

236
        callExecutor.execute(new ClosedByNotFoundCompressor());
×
237
        return;
×
238
      }
239
    } else {
240
      compressor = Codec.Identity.NONE;
1✔
241
    }
242
    prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
1✔
243

244
    Deadline effectiveDeadline = effectiveDeadline();
1✔
245
    boolean contextIsDeadlineSource = effectiveDeadline != null
1✔
246
        && effectiveDeadline.equals(context.getDeadline());
1✔
247
    cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
1✔
248
    boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
1✔
249
    if (!deadlineExceeded) {
1✔
250
      stream = clientStreamProvider.newStream(method, callOptions, headers, context);
1✔
251
    } else {
252
      ClientStreamTracer[] tracers =
1✔
253
          GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
1✔
254
      String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
1✔
255
      Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
256
      String description = String.format(
1✔
257
          "ClientCall started after %s deadline was exceeded %.9f seconds ago. "
258
              + "Name resolution delay %.9f seconds.", deadlineName,
259
          cancellationHandler.remainingNanos / NANO_TO_SECS,
1✔
260
          nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
1✔
261
      stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
1✔
262
    }
263

264
    if (callExecutorIsDirect) {
1✔
265
      stream.optimizeForDirectExecutor();
1✔
266
    }
267
    if (callOptions.getAuthority() != null) {
1✔
268
      stream.setAuthority(callOptions.getAuthority());
1✔
269
    }
270
    if (callOptions.getMaxInboundMessageSize() != null) {
1✔
271
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
1✔
272
    }
273
    if (callOptions.getMaxOutboundMessageSize() != null) {
1✔
274
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
1✔
275
    }
276
    if (effectiveDeadline != null) {
1✔
277
      stream.setDeadline(effectiveDeadline);
1✔
278
    }
279
    stream.setCompressor(compressor);
1✔
280
    if (fullStreamDecompression) {
1✔
281
      stream.setFullStreamDecompression(fullStreamDecompression);
×
282
    }
283
    stream.setDecompressorRegistry(decompressorRegistry);
1✔
284
    channelCallsTracer.reportCallStarted();
1✔
285
    stream.start(new ClientStreamListenerImpl(observer));
1✔
286

287
    // Delay any sources of cancellation after start(), because most of the transports are broken if
288
    // they receive cancel before start. Issue #1343 has more details
289

290
    // Propagate later Context cancellation to the remote side.
291
    cancellationHandler.setUp();
1✔
292
  }
1✔
293

294
  private void applyMethodConfig() {
295
    MethodInfo info = callOptions.getOption(MethodInfo.KEY);
1✔
296
    if (info == null) {
1✔
297
      return;
1✔
298
    }
299
    if (info.timeoutNanos != null) {
1✔
300
      Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
301
      Deadline existingDeadline = callOptions.getDeadline();
1✔
302
      // If the new deadline is sooner than the existing deadline, swap them.
303
      if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) {
1✔
304
        callOptions = callOptions.withDeadline(newDeadline);
1✔
305
      }
306
    }
307
    if (info.waitForReady != null) {
1✔
308
      callOptions =
1✔
309
          info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady();
1✔
310
    }
311
    if (info.maxInboundMessageSize != null) {
1✔
312
      Integer existingLimit = callOptions.getMaxInboundMessageSize();
×
313
      if (existingLimit != null) {
×
314
        callOptions =
×
315
            callOptions.withMaxInboundMessageSize(
×
316
                Math.min(existingLimit, info.maxInboundMessageSize));
×
317
      } else {
318
        callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize);
×
319
      }
320
    }
321
    if (info.maxOutboundMessageSize != null) {
1✔
322
      Integer existingLimit = callOptions.getMaxOutboundMessageSize();
×
323
      if (existingLimit != null) {
×
324
        callOptions =
×
325
            callOptions.withMaxOutboundMessageSize(
×
326
                Math.min(existingLimit, info.maxOutboundMessageSize));
×
327
      } else {
328
        callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
×
329
      }
330
    }
331
  }
1✔
332

333
  private final class CancellationHandler implements Runnable, CancellationListener {
334
    private final boolean contextIsDeadlineSource;
335
    private final boolean hasDeadline;
336
    private final long remainingNanos;
337
    private volatile ScheduledFuture<?> deadlineCancellationFuture;
338
    private volatile boolean tearDownCalled;
339

340
    CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
1✔
341
      this.contextIsDeadlineSource = contextIsDeadlineSource;
1✔
342
      if (deadline == null) {
1✔
343
        hasDeadline = false;
1✔
344
        remainingNanos = 0;
1✔
345
      } else {
346
        hasDeadline = true;
1✔
347
        remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
348
      }
349
    }
1✔
350

351
    void setUp() {
352
      if (tearDownCalled) {
1✔
353
        return;
1✔
354
      }
355
      if (hasDeadline
1✔
356
          // If the context has the effective deadline, we don't need to schedule an extra task.
357
          && !contextIsDeadlineSource
358
          // If the channel has been terminated, we don't need to schedule an extra task.
359
          && deadlineCancellationExecutor != null) {
1✔
360
        deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
1✔
361
            new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
362
      }
363
      context.addListener(this, directExecutor());
1✔
364
      if (tearDownCalled) {
1✔
365
        // Race detected! Re-run to make sure the future is cancelled and context listener removed
366
        tearDown();
1✔
367
      }
368
    }
1✔
369

370
    // May be called multiple times, and race with setUp()
371
    void tearDown() {
372
      tearDownCalled = true;
1✔
373
      ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
1✔
374
      if (deadlineCancellationFuture != null) {
1✔
375
        deadlineCancellationFuture.cancel(false);
1✔
376
      }
377
      context.removeListener(this);
1✔
378
    }
1✔
379

380
    @Override
381
    public void cancelled(Context context) {
382
      if (hasDeadline && contextIsDeadlineSource
1✔
383
          && context.cancellationCause() instanceof TimeoutException) {
1✔
384
        stream.cancel(formatDeadlineExceededStatus());
1✔
385
        return;
1✔
386
      }
387
      stream.cancel(statusFromCancelled(context));
1✔
388
    }
1✔
389

390
    @Override
391
    public void run() {
392
      stream.cancel(formatDeadlineExceededStatus());
1✔
393
    }
1✔
394

395
    Status formatDeadlineExceededStatus() {
396
      // DelayedStream.cancel() is safe to call from a thread that is different from where the
397
      // stream is created.
398
      long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
399
      long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
400

401
      StringBuilder buf = new StringBuilder();
1✔
402
      buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
1✔
403
      buf.append(" deadline exceeded after ");
1✔
404
      if (remainingNanos < 0) {
1✔
405
        buf.append('-');
1✔
406
      }
407
      buf.append(seconds);
1✔
408
      buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
409
      buf.append("s. ");
1✔
410
      Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
411
      buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
1✔
412
          nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
1✔
413
      if (stream != null) {
1✔
414
        InsightBuilder insight = new InsightBuilder();
1✔
415
        stream.appendTimeoutInsight(insight);
1✔
416
        buf.append(" ");
1✔
417
        buf.append(insight);
1✔
418
      }
419
      return DEADLINE_EXCEEDED.withDescription(buf.toString());
1✔
420
    }
421
  }
422

423
  @Nullable
424
  private Deadline effectiveDeadline() {
425
    // Call options and context are immutable, so we don't need to cache the deadline.
426
    return min(callOptions.getDeadline(), context.getDeadline());
1✔
427
  }
428

429
  @Nullable
430
  private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
431
    if (deadline0 == null) {
1✔
432
      return deadline1;
1✔
433
    }
434
    if (deadline1 == null) {
1✔
435
      return deadline0;
1✔
436
    }
437
    return deadline0.minimum(deadline1);
1✔
438
  }
439

440
  @Override
441
  public void request(int numMessages) {
442
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
1✔
443
      PerfMark.attachTag(tag);
1✔
444
      checkState(stream != null, "Not started");
1✔
445
      checkArgument(numMessages >= 0, "Number requested must be non-negative");
1✔
446
      stream.request(numMessages);
1✔
447
    }
448
  }
1✔
449

450
  @Override
451
  public void cancel(@Nullable String message, @Nullable Throwable cause) {
452
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) {
1✔
453
      PerfMark.attachTag(tag);
1✔
454
      cancelInternal(message, cause);
1✔
455
    }
456
  }
1✔
457

458
  private void cancelInternal(@Nullable String message, @Nullable Throwable cause) {
459
    if (message == null && cause == null) {
1✔
460
      cause = new CancellationException("Cancelled without a message or cause");
×
461
      log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
×
462
    }
463
    if (cancelCalled) {
1✔
464
      return;
1✔
465
    }
466
    cancelCalled = true;
1✔
467
    try {
468
      // Cancel is called in exception handling cases, so it may be the case that the
469
      // stream was never successfully created or start has never been called.
470
      if (stream != null) {
1✔
471
        Status status = Status.CANCELLED;
1✔
472
        if (message != null) {
1✔
473
          status = status.withDescription(message);
1✔
474
        } else {
475
          status = status.withDescription("Call cancelled without message");
1✔
476
        }
477
        if (cause != null) {
1✔
478
          status = status.withCause(cause);
1✔
479
        }
480
        stream.cancel(status);
1✔
481
      }
482
    } finally {
483
      cancellationHandler.tearDown();
1✔
484
    }
485
  }
1✔
486

487
  @Override
488
  public void halfClose() {
489
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) {
1✔
490
      PerfMark.attachTag(tag);
1✔
491
      halfCloseInternal();
1✔
492
    }
493
  }
1✔
494

495
  private void halfCloseInternal() {
496
    checkState(stream != null, "Not started");
1✔
497
    checkState(!cancelCalled, "call was cancelled");
1✔
498
    checkState(!halfCloseCalled, "call already half-closed");
1✔
499
    halfCloseCalled = true;
1✔
500
    stream.halfClose();
1✔
501
  }
1✔
502

503
  @Override
504
  public void sendMessage(ReqT message) {
505
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) {
1✔
506
      PerfMark.attachTag(tag);
1✔
507
      sendMessageInternal(message);
1✔
508
    }
509
  }
1✔
510

511
  private void sendMessageInternal(ReqT message) {
512
    checkState(stream != null, "Not started");
1✔
513
    checkState(!cancelCalled, "call was cancelled");
1✔
514
    checkState(!halfCloseCalled, "call was half-closed");
1✔
515
    try {
516
      if (stream instanceof RetriableStream) {
1✔
517
        @SuppressWarnings("unchecked")
518
        RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream;
1✔
519
        retriableStream.sendMessage(message);
1✔
520
      } else {
1✔
521
        stream.writeMessage(method.streamRequest(message));
1✔
522
      }
523
    } catch (RuntimeException e) {
1✔
524
      stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
1✔
525
      return;
1✔
526
    } catch (Error e) {
×
527
      stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
×
528
      throw e;
×
529
    }
1✔
530
    // For unary requests, we don't flush since we know that halfClose should be coming soon. This
531
    // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
532
    // possibility of broken applications forgetting to call halfClose without noticing.
533
    if (!unaryRequest) {
1✔
534
      stream.flush();
1✔
535
    }
536
  }
1✔
537

538
  @Override
539
  public void setMessageCompression(boolean enabled) {
540
    checkState(stream != null, "Not started");
1✔
541
    stream.setMessageCompression(enabled);
1✔
542
  }
1✔
543

544
  @Override
545
  public boolean isReady() {
546
    if (halfCloseCalled) {
1✔
547
      return false;
1✔
548
    }
549
    return stream.isReady();
1✔
550
  }
551

552
  @Override
553
  public Attributes getAttributes() {
554
    if (stream != null) {
1✔
555
      return stream.getAttributes();
1✔
556
    }
557
    return Attributes.EMPTY;
1✔
558
  }
559

560
  private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
561
    observer.onClose(status, trailers);
1✔
562
  }
1✔
563

564
  @Override
565
  public String toString() {
566
    return MoreObjects.toStringHelper(this).add("method", method).toString();
×
567
  }
568

569
  private class ClientStreamListenerImpl implements ClientStreamListener {
570
    private final Listener<RespT> observer;
571
    private Status exceptionStatus;
572

573
    public ClientStreamListenerImpl(Listener<RespT> observer) {
1✔
574
      this.observer = checkNotNull(observer, "observer");
1✔
575
    }
1✔
576

577
    /**
578
     * Cancels call and schedules onClose() notification. May only be called from the application
579
     * thread.
580
     */
581
    private void exceptionThrown(Status status) {
582
      // Since each RPC can have its own executor, we can only call onClose() when we are sure there
583
      // will be no further callbacks. We set the status here and overwrite the onClose() details
584
      // when it arrives.
585
      exceptionStatus = status;
1✔
586
      stream.cancel(status);
1✔
587
    }
1✔
588

589
    @Override
590
    public void headersRead(final Metadata headers) {
591
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) {
1✔
592
        PerfMark.attachTag(tag);
1✔
593
        final Link link = PerfMark.linkOut();
1✔
594
        final class HeadersRead extends ContextRunnable {
595
          HeadersRead() {
1✔
596
            super(context);
1✔
597
          }
1✔
598

599
          @Override
600
          public void runInContext() {
601
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) {
1✔
602
              PerfMark.attachTag(tag);
1✔
603
              PerfMark.linkIn(link);
1✔
604
              runInternal();
1✔
605
            }
606
          }
1✔
607

608
          private void runInternal() {
609
            if (exceptionStatus != null) {
1✔
610
              return;
×
611
            }
612
            try {
613
              observer.onHeaders(headers);
1✔
614
            } catch (Throwable t) {
1✔
615
              exceptionThrown(
1✔
616
                  Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
1✔
617
            }
1✔
618
          }
1✔
619
        }
620

621
        callExecutor.execute(new HeadersRead());
1✔
622
      }
623
    }
1✔
624

625
    @Override
626
    public void messagesAvailable(final MessageProducer producer) {
627
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) {
1✔
628
        PerfMark.attachTag(tag);
1✔
629
        final Link link = PerfMark.linkOut();
1✔
630
        final class MessagesAvailable extends ContextRunnable {
631
          MessagesAvailable() {
1✔
632
            super(context);
1✔
633
          }
1✔
634

635
          @Override
636
          public void runInContext() {
637
            try (TaskCloseable ignore =
1✔
638
                     PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) {
1✔
639
              PerfMark.attachTag(tag);
1✔
640
              PerfMark.linkIn(link);
1✔
641
              runInternal();
1✔
642
            }
643
          }
1✔
644

645
          private void runInternal() {
646
            if (exceptionStatus != null) {
1✔
647
              GrpcUtil.closeQuietly(producer);
×
648
              return;
×
649
            }
650
            try {
651
              InputStream message;
652
              while ((message = producer.next()) != null) {
1✔
653
                try {
654
                  observer.onMessage(method.parseResponse(message));
1✔
655
                } catch (Throwable t) {
1✔
656
                  GrpcUtil.closeQuietly(message);
1✔
657
                  throw t;
1✔
658
                }
1✔
659
                message.close();
1✔
660
              }
661
            } catch (Throwable t) {
1✔
662
              GrpcUtil.closeQuietly(producer);
1✔
663
              exceptionThrown(
1✔
664
                  Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
1✔
665
            }
1✔
666
          }
1✔
667
        }
668

669
        callExecutor.execute(new MessagesAvailable());
1✔
670
      }
671
    }
1✔
672

673
    @Override
674
    public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
675
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) {
1✔
676
        PerfMark.attachTag(tag);
1✔
677
        closedInternal(status, rpcProgress, trailers);
1✔
678
      }
679
    }
1✔
680

681
    private void closedInternal(
682
        Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
683
      Deadline deadline = effectiveDeadline();
1✔
684
      if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
1✔
685
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
686
        // description. Since our timer may be delayed in firing, we double-check the deadline and
687
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
688
        if (deadline.isExpired()) {
1✔
689
          status = cancellationHandler.formatDeadlineExceededStatus();
1✔
690
          // Replace trailers to prevent mixing sources of status and trailers.
691
          trailers = new Metadata();
1✔
692
        }
693
      }
694
      final Status savedStatus = status;
1✔
695
      final Metadata savedTrailers = trailers;
1✔
696
      final Link link = PerfMark.linkOut();
1✔
697
      final class StreamClosed extends ContextRunnable {
698
        StreamClosed() {
1✔
699
          super(context);
1✔
700
        }
1✔
701

702
        @Override
703
        public void runInContext() {
704
          try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) {
1✔
705
            PerfMark.attachTag(tag);
1✔
706
            PerfMark.linkIn(link);
1✔
707
            runInternal();
1✔
708
          }
709
        }
1✔
710

711
        private void runInternal() {
712
          cancellationHandler.tearDown();
1✔
713
          Status status = savedStatus;
1✔
714
          Metadata trailers = savedTrailers;
1✔
715
          if (exceptionStatus != null) {
1✔
716
            // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
717
            // However the cancel is racy and this closed() may have already been queued when the
718
            // cancellation occurred. Since other calls like onMessage() will throw away data if
719
            // exceptionStatus != null, it is semantically essential that we _not_ use a status
720
            // provided by the server.
721
            status = exceptionStatus;
1✔
722
            // Replace trailers to prevent mixing sources of status and trailers.
723
            trailers = new Metadata();
1✔
724
          }
725
          try {
726
            closeObserver(observer, status, trailers);
1✔
727
          } finally {
728
            channelCallsTracer.reportCallEnded(status.isOk());
1✔
729
          }
730
        }
1✔
731
      }
732

733
      callExecutor.execute(new StreamClosed());
1✔
734
    }
1✔
735

736
    @Override
737
    public void onReady() {
738
      if (method.getType().clientSendsOneMessage()) {
1✔
739
        return;
1✔
740
      }
741
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) {
1✔
742
        PerfMark.attachTag(tag);
1✔
743
        final Link link = PerfMark.linkOut();
1✔
744

745
        final class StreamOnReady extends ContextRunnable {
746
          StreamOnReady() {
1✔
747
            super(context);
1✔
748
          }
1✔
749

750
          @Override
751
          public void runInContext() {
752
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) {
1✔
753
              PerfMark.attachTag(tag);
1✔
754
              PerfMark.linkIn(link);
1✔
755
              runInternal();
1✔
756
            }
757
          }
1✔
758

759
          private void runInternal() {
760
            if (exceptionStatus != null) {
1✔
761
              return;
×
762
            }
763
            try {
764
              observer.onReady();
1✔
765
            } catch (Throwable t) {
1✔
766
              exceptionThrown(
1✔
767
                  Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
1✔
768
            }
1✔
769
          }
1✔
770
        }
771

772
        callExecutor.execute(new StreamOnReady());
1✔
773
      }
774
    }
1✔
775
  }
776
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc