• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19684

11 Feb 2025 06:08AM CUT coverage: 88.616% (+0.003%) from 88.613%
#19684

push

github

web-flow
core: logging the error message when onClose() itself fails (#11880)

34190 of 38582 relevant lines covered (88.62%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.03
/../core/src/main/java/io/grpc/internal/ClientCallImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
24
import static io.grpc.Contexts.statusFromCancelled;
25
import static io.grpc.Status.DEADLINE_EXCEEDED;
26
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
27
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
28
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
29
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
30
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
31

32
import com.google.common.annotations.VisibleForTesting;
33
import com.google.common.base.MoreObjects;
34
import io.grpc.Attributes;
35
import io.grpc.CallOptions;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.Codec;
39
import io.grpc.Compressor;
40
import io.grpc.CompressorRegistry;
41
import io.grpc.Context;
42
import io.grpc.Context.CancellationListener;
43
import io.grpc.Deadline;
44
import io.grpc.DecompressorRegistry;
45
import io.grpc.InternalConfigSelector;
46
import io.grpc.InternalDecompressorRegistry;
47
import io.grpc.Metadata;
48
import io.grpc.MethodDescriptor;
49
import io.grpc.MethodDescriptor.MethodType;
50
import io.grpc.Status;
51
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
52
import io.perfmark.Link;
53
import io.perfmark.PerfMark;
54
import io.perfmark.Tag;
55
import io.perfmark.TaskCloseable;
56
import java.io.InputStream;
57
import java.nio.charset.Charset;
58
import java.util.Locale;
59
import java.util.concurrent.CancellationException;
60
import java.util.concurrent.Executor;
61
import java.util.concurrent.ScheduledExecutorService;
62
import java.util.concurrent.ScheduledFuture;
63
import java.util.concurrent.TimeUnit;
64
import java.util.concurrent.TimeoutException;
65
import java.util.logging.Level;
66
import java.util.logging.Logger;
67
import javax.annotation.Nullable;
68

69
/**
70
 * Implementation of {@link ClientCall}.
71
 */
72
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
73

74
  private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
1✔
75
  private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
1✔
76
      = "gzip".getBytes(Charset.forName("US-ASCII"));
1✔
77
  private static final double NANO_TO_SECS = 1.0 * TimeUnit.SECONDS.toNanos(1);
1✔
78

79
  private final MethodDescriptor<ReqT, RespT> method;
80
  private final Tag tag;
81
  private final Executor callExecutor;
82
  private final boolean callExecutorIsDirect;
83
  private final CallTracer channelCallsTracer;
84
  private final Context context;
85
  private CancellationHandler cancellationHandler;
86
  private final boolean unaryRequest;
87
  private CallOptions callOptions;
88
  private ClientStream stream;
89
  private boolean cancelCalled;
90
  private boolean halfCloseCalled;
91
  private final ClientStreamProvider clientStreamProvider;
92
  private final ScheduledExecutorService deadlineCancellationExecutor;
93
  private boolean fullStreamDecompression;
94
  private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
95
  private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
1✔
96

97
  ClientCallImpl(
98
      MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
99
      ClientStreamProvider clientStreamProvider,
100
      ScheduledExecutorService deadlineCancellationExecutor,
101
      CallTracer channelCallsTracer,
102
      // TODO(zdapeng): remove this arg
103
      @Nullable InternalConfigSelector configSelector) {
1✔
104
    this.method = method;
1✔
105
    // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
106
    this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
1✔
107
    // If we know that the executor is a direct executor, we don't need to wrap it with a
108
    // SerializingExecutor. This is purely for performance reasons.
109
    // See https://github.com/grpc/grpc-java/issues/368
110
    if (executor == directExecutor()) {
1✔
111
      this.callExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
112
      callExecutorIsDirect = true;
1✔
113
    } else {
114
      this.callExecutor = new SerializingExecutor(executor);
1✔
115
      callExecutorIsDirect = false;
1✔
116
    }
117
    this.channelCallsTracer = channelCallsTracer;
1✔
118
    // Propagate the context from the thread which initiated the call to all callbacks.
119
    this.context = Context.current();
1✔
120
    this.unaryRequest = method.getType() == MethodType.UNARY
1✔
121
        || method.getType() == MethodType.SERVER_STREAMING;
1✔
122
    this.callOptions = callOptions;
1✔
123
    this.clientStreamProvider = clientStreamProvider;
1✔
124
    this.deadlineCancellationExecutor = deadlineCancellationExecutor;
1✔
125
    PerfMark.event("ClientCall.<init>", tag);
1✔
126
  }
1✔
127

128
  /**
129
   * Provider of {@link ClientStream}s.
130
   */
131
  interface ClientStreamProvider {
132
    ClientStream newStream(
133
        MethodDescriptor<?, ?> method,
134
        CallOptions callOptions,
135
        Metadata headers,
136
        Context context);
137
  }
138

139
  ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
140
    this.fullStreamDecompression = fullStreamDecompression;
1✔
141
    return this;
1✔
142
  }
143

144
  ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
145
    this.decompressorRegistry = decompressorRegistry;
1✔
146
    return this;
1✔
147
  }
148

149
  ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
150
    this.compressorRegistry = compressorRegistry;
1✔
151
    return this;
1✔
152
  }
153

154
  @VisibleForTesting
155
  static void prepareHeaders(
156
      Metadata headers,
157
      DecompressorRegistry decompressorRegistry,
158
      Compressor compressor,
159
      boolean fullStreamDecompression) {
160
    headers.discardAll(CONTENT_LENGTH_KEY);
1✔
161
    headers.discardAll(MESSAGE_ENCODING_KEY);
1✔
162
    if (compressor != Codec.Identity.NONE) {
1✔
163
      headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
1✔
164
    }
165

166
    headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
167
    byte[] advertisedEncodings =
1✔
168
        InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
1✔
169
    if (advertisedEncodings.length != 0) {
1✔
170
      headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
1✔
171
    }
172

173
    headers.discardAll(CONTENT_ENCODING_KEY);
1✔
174
    headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
1✔
175
    if (fullStreamDecompression) {
1✔
176
      headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
1✔
177
    }
178
  }
1✔
179

180
  @Override
181
  public void start(Listener<RespT> observer, Metadata headers) {
182
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) {
1✔
183
      PerfMark.attachTag(tag);
1✔
184
      startInternal(observer, headers);
1✔
185
    }
186
  }
1✔
187

188
  private void startInternal(Listener<RespT> observer, Metadata headers) {
189
    checkState(stream == null, "Already started");
1✔
190
    checkState(!cancelCalled, "call was cancelled");
1✔
191
    checkNotNull(observer, "observer");
1✔
192
    checkNotNull(headers, "headers");
1✔
193

194
    if (context.isCancelled()) {
1✔
195
      // Context is already cancelled so no need to create a real stream, just notify the observer
196
      // of cancellation via callback on the executor
197
      stream = NoopClientStream.INSTANCE;
1✔
198
      final Listener<RespT> finalObserver = observer;
1✔
199
      class ClosedByContext extends ContextRunnable {
200
        ClosedByContext() {
1✔
201
          super(context);
1✔
202
        }
1✔
203

204
        @Override
205
        public void runInContext() {
206
          closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
1✔
207
        }
1✔
208
      }
209

210
      callExecutor.execute(new ClosedByContext());
1✔
211
      return;
1✔
212
    }
213
    applyMethodConfig();
1✔
214
    final String compressorName = callOptions.getCompressor();
1✔
215
    Compressor compressor;
216
    if (compressorName != null) {
1✔
217
      compressor = compressorRegistry.lookupCompressor(compressorName);
1✔
218
      if (compressor == null) {
1✔
219
        stream = NoopClientStream.INSTANCE;
×
220
        final Listener<RespT> finalObserver = observer;
×
221
        class ClosedByNotFoundCompressor extends ContextRunnable {
222
          ClosedByNotFoundCompressor() {
×
223
            super(context);
×
224
          }
×
225

226
          @Override
227
          public void runInContext() {
228
            closeObserver(
×
229
                finalObserver,
230
                Status.INTERNAL.withDescription(
×
231
                    String.format("Unable to find compressor by name %s", compressorName)),
×
232
                new Metadata());
233
          }
×
234
        }
235

236
        callExecutor.execute(new ClosedByNotFoundCompressor());
×
237
        return;
×
238
      }
239
    } else {
240
      compressor = Codec.Identity.NONE;
1✔
241
    }
242
    prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
1✔
243

244
    Deadline effectiveDeadline = effectiveDeadline();
1✔
245
    boolean contextIsDeadlineSource = effectiveDeadline != null
1✔
246
        && effectiveDeadline.equals(context.getDeadline());
1✔
247
    cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
1✔
248
    boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
1✔
249
    if (!deadlineExceeded) {
1✔
250
      stream = clientStreamProvider.newStream(method, callOptions, headers, context);
1✔
251
    } else {
252
      ClientStreamTracer[] tracers =
1✔
253
          GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
1✔
254
      String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
1✔
255
      Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
256
      String description = String.format(
1✔
257
          "ClientCall started after %s deadline was exceeded %.9f seconds ago. "
258
              + "Name resolution delay %.9f seconds.", deadlineName,
259
          cancellationHandler.remainingNanos / NANO_TO_SECS,
1✔
260
          nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
1✔
261
      stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
1✔
262
    }
263

264
    if (callExecutorIsDirect) {
1✔
265
      stream.optimizeForDirectExecutor();
1✔
266
    }
267
    if (callOptions.getAuthority() != null) {
1✔
268
      stream.setAuthority(callOptions.getAuthority());
1✔
269
    }
270
    if (callOptions.getMaxInboundMessageSize() != null) {
1✔
271
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
1✔
272
    }
273
    if (callOptions.getMaxOutboundMessageSize() != null) {
1✔
274
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
1✔
275
    }
276
    if (effectiveDeadline != null) {
1✔
277
      stream.setDeadline(effectiveDeadline);
1✔
278
    }
279
    stream.setCompressor(compressor);
1✔
280
    if (fullStreamDecompression) {
1✔
281
      stream.setFullStreamDecompression(fullStreamDecompression);
×
282
    }
283
    stream.setDecompressorRegistry(decompressorRegistry);
1✔
284
    channelCallsTracer.reportCallStarted();
1✔
285
    stream.start(new ClientStreamListenerImpl(observer));
1✔
286

287
    // Delay any sources of cancellation after start(), because most of the transports are broken if
288
    // they receive cancel before start. Issue #1343 has more details
289

290
    // Propagate later Context cancellation to the remote side.
291
    cancellationHandler.setUp();
1✔
292
  }
1✔
293

294
  private void applyMethodConfig() {
295
    MethodInfo info = callOptions.getOption(MethodInfo.KEY);
1✔
296
    if (info == null) {
1✔
297
      return;
1✔
298
    }
299
    if (info.timeoutNanos != null) {
1✔
300
      Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
301
      Deadline existingDeadline = callOptions.getDeadline();
1✔
302
      // If the new deadline is sooner than the existing deadline, swap them.
303
      if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) {
1✔
304
        callOptions = callOptions.withDeadline(newDeadline);
1✔
305
      }
306
    }
307
    if (info.waitForReady != null) {
1✔
308
      callOptions =
1✔
309
          info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady();
1✔
310
    }
311
    if (info.maxInboundMessageSize != null) {
1✔
312
      Integer existingLimit = callOptions.getMaxInboundMessageSize();
×
313
      if (existingLimit != null) {
×
314
        callOptions =
×
315
            callOptions.withMaxInboundMessageSize(
×
316
                Math.min(existingLimit, info.maxInboundMessageSize));
×
317
      } else {
318
        callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize);
×
319
      }
320
    }
321
    if (info.maxOutboundMessageSize != null) {
1✔
322
      Integer existingLimit = callOptions.getMaxOutboundMessageSize();
×
323
      if (existingLimit != null) {
×
324
        callOptions =
×
325
            callOptions.withMaxOutboundMessageSize(
×
326
                Math.min(existingLimit, info.maxOutboundMessageSize));
×
327
      } else {
328
        callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
×
329
      }
330
    }
331
  }
1✔
332

333
  private final class CancellationHandler implements Runnable, CancellationListener {
334
    private final boolean contextIsDeadlineSource;
335
    private final boolean hasDeadline;
336
    private final long remainingNanos;
337
    private volatile ScheduledFuture<?> deadlineCancellationFuture;
338
    private volatile boolean tearDownCalled;
339

340
    CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
1✔
341
      this.contextIsDeadlineSource = contextIsDeadlineSource;
1✔
342
      if (deadline == null) {
1✔
343
        hasDeadline = false;
1✔
344
        remainingNanos = 0;
1✔
345
      } else {
346
        hasDeadline = true;
1✔
347
        remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
348
      }
349
    }
1✔
350

351
    void setUp() {
352
      if (tearDownCalled) {
1✔
353
        return;
1✔
354
      }
355
      if (hasDeadline
1✔
356
          // If the context has the effective deadline, we don't need to schedule an extra task.
357
          && !contextIsDeadlineSource
358
          // If the channel has been terminated, we don't need to schedule an extra task.
359
          && deadlineCancellationExecutor != null) {
1✔
360
        deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
1✔
361
            new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
362
      }
363
      context.addListener(this, directExecutor());
1✔
364
      if (tearDownCalled) {
1✔
365
        // Race detected! Re-run to make sure the future is cancelled and context listener removed
366
        tearDown();
1✔
367
      }
368
    }
1✔
369

370
    // May be called multiple times, and race with setUp()
371
    void tearDown() {
372
      tearDownCalled = true;
1✔
373
      ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
1✔
374
      if (deadlineCancellationFuture != null) {
1✔
375
        deadlineCancellationFuture.cancel(false);
1✔
376
      }
377
      context.removeListener(this);
1✔
378
    }
1✔
379

380
    @Override
381
    public void cancelled(Context context) {
382
      if (hasDeadline && contextIsDeadlineSource
1✔
383
          && context.cancellationCause() instanceof TimeoutException) {
1✔
384
        stream.cancel(formatDeadlineExceededStatus());
1✔
385
        return;
1✔
386
      }
387
      stream.cancel(statusFromCancelled(context));
1✔
388
    }
1✔
389

390
    @Override
391
    public void run() {
392
      stream.cancel(formatDeadlineExceededStatus());
1✔
393
    }
1✔
394

395
    Status formatDeadlineExceededStatus() {
396
      // DelayedStream.cancel() is safe to call from a thread that is different from where the
397
      // stream is created.
398
      long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
399
      long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
400

401
      StringBuilder buf = new StringBuilder();
1✔
402
      buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
1✔
403
      buf.append(" deadline exceeded after ");
1✔
404
      if (remainingNanos < 0) {
1✔
405
        buf.append('-');
1✔
406
      }
407
      buf.append(seconds);
1✔
408
      buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
409
      buf.append("s. ");
1✔
410
      Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
411
      buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
1✔
412
          nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
1✔
413
      if (stream != null) {
1✔
414
        InsightBuilder insight = new InsightBuilder();
1✔
415
        stream.appendTimeoutInsight(insight);
1✔
416
        buf.append(" ");
1✔
417
        buf.append(insight);
1✔
418
      }
419
      return DEADLINE_EXCEEDED.withDescription(buf.toString());
1✔
420
    }
421
  }
422

423
  @Nullable
424
  private Deadline effectiveDeadline() {
425
    // Call options and context are immutable, so we don't need to cache the deadline.
426
    return min(callOptions.getDeadline(), context.getDeadline());
1✔
427
  }
428

429
  @Nullable
430
  private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
431
    if (deadline0 == null) {
1✔
432
      return deadline1;
1✔
433
    }
434
    if (deadline1 == null) {
1✔
435
      return deadline0;
1✔
436
    }
437
    return deadline0.minimum(deadline1);
1✔
438
  }
439

440
  @Override
441
  public void request(int numMessages) {
442
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
1✔
443
      PerfMark.attachTag(tag);
1✔
444
      checkState(stream != null, "Not started");
1✔
445
      checkArgument(numMessages >= 0, "Number requested must be non-negative");
1✔
446
      stream.request(numMessages);
1✔
447
    }
448
  }
1✔
449

450
  @Override
451
  public void cancel(@Nullable String message, @Nullable Throwable cause) {
452
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) {
1✔
453
      PerfMark.attachTag(tag);
1✔
454
      cancelInternal(message, cause);
1✔
455
    }
456
  }
1✔
457

458
  private void cancelInternal(@Nullable String message, @Nullable Throwable cause) {
459
    if (message == null && cause == null) {
1✔
460
      cause = new CancellationException("Cancelled without a message or cause");
×
461
      log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
×
462
    }
463
    if (cancelCalled) {
1✔
464
      return;
1✔
465
    }
466
    cancelCalled = true;
1✔
467
    try {
468
      // Cancel is called in exception handling cases, so it may be the case that the
469
      // stream was never successfully created or start has never been called.
470
      if (stream != null) {
1✔
471
        Status status = Status.CANCELLED;
1✔
472
        if (message != null) {
1✔
473
          status = status.withDescription(message);
1✔
474
        } else {
475
          status = status.withDescription("Call cancelled without message");
1✔
476
        }
477
        if (cause != null) {
1✔
478
          status = status.withCause(cause);
1✔
479
        }
480
        stream.cancel(status);
1✔
481
      }
482
    } finally {
483
      // start() might not have been called
484
      if (cancellationHandler != null) {
1✔
485
        cancellationHandler.tearDown();
1✔
486
      }
487
    }
488
  }
1✔
489

490
  @Override
491
  public void halfClose() {
492
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) {
1✔
493
      PerfMark.attachTag(tag);
1✔
494
      halfCloseInternal();
1✔
495
    }
496
  }
1✔
497

498
  private void halfCloseInternal() {
499
    checkState(stream != null, "Not started");
1✔
500
    checkState(!cancelCalled, "call was cancelled");
1✔
501
    checkState(!halfCloseCalled, "call already half-closed");
1✔
502
    halfCloseCalled = true;
1✔
503
    stream.halfClose();
1✔
504
  }
1✔
505

506
  @Override
507
  public void sendMessage(ReqT message) {
508
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) {
1✔
509
      PerfMark.attachTag(tag);
1✔
510
      sendMessageInternal(message);
1✔
511
    }
512
  }
1✔
513

514
  private void sendMessageInternal(ReqT message) {
515
    checkState(stream != null, "Not started");
1✔
516
    checkState(!cancelCalled, "call was cancelled");
1✔
517
    checkState(!halfCloseCalled, "call was half-closed");
1✔
518
    try {
519
      if (stream instanceof RetriableStream) {
1✔
520
        @SuppressWarnings("unchecked")
521
        RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream;
1✔
522
        retriableStream.sendMessage(message);
1✔
523
      } else {
1✔
524
        stream.writeMessage(method.streamRequest(message));
1✔
525
      }
526
    } catch (RuntimeException e) {
1✔
527
      stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
1✔
528
      return;
1✔
529
    } catch (Error e) {
×
530
      stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
×
531
      throw e;
×
532
    }
1✔
533
    // For unary requests, we don't flush since we know that halfClose should be coming soon. This
534
    // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
535
    // possibility of broken applications forgetting to call halfClose without noticing.
536
    if (!unaryRequest) {
1✔
537
      stream.flush();
1✔
538
    }
539
  }
1✔
540

541
  @Override
542
  public void setMessageCompression(boolean enabled) {
543
    checkState(stream != null, "Not started");
1✔
544
    stream.setMessageCompression(enabled);
1✔
545
  }
1✔
546

547
  @Override
548
  public boolean isReady() {
549
    if (halfCloseCalled) {
1✔
550
      return false;
1✔
551
    }
552
    return stream.isReady();
1✔
553
  }
554

555
  @Override
556
  public Attributes getAttributes() {
557
    if (stream != null) {
1✔
558
      return stream.getAttributes();
1✔
559
    }
560
    return Attributes.EMPTY;
1✔
561
  }
562

563
  private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
564
    try {
565
      observer.onClose(status, trailers);
1✔
566
    } catch (RuntimeException ex) {
1✔
567
      log.log(Level.WARNING, "Exception thrown by onClose() in ClientCall", ex);
1✔
568
    }
1✔
569
  }
1✔
570

571
  @Override
572
  public String toString() {
573
    return MoreObjects.toStringHelper(this).add("method", method).toString();
×
574
  }
575

576
  private class ClientStreamListenerImpl implements ClientStreamListener {
577
    private final Listener<RespT> observer;
578
    private Status exceptionStatus;
579

580
    public ClientStreamListenerImpl(Listener<RespT> observer) {
1✔
581
      this.observer = checkNotNull(observer, "observer");
1✔
582
    }
1✔
583

584
    /**
585
     * Cancels call and schedules onClose() notification. May only be called from the application
586
     * thread.
587
     */
588
    private void exceptionThrown(Status status) {
589
      // Since each RPC can have its own executor, we can only call onClose() when we are sure there
590
      // will be no further callbacks. We set the status here and overwrite the onClose() details
591
      // when it arrives.
592
      exceptionStatus = status;
1✔
593
      stream.cancel(status);
1✔
594
    }
1✔
595

596
    @Override
597
    public void headersRead(final Metadata headers) {
598
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) {
1✔
599
        PerfMark.attachTag(tag);
1✔
600
        final Link link = PerfMark.linkOut();
1✔
601
        final class HeadersRead extends ContextRunnable {
602
          HeadersRead() {
1✔
603
            super(context);
1✔
604
          }
1✔
605

606
          @Override
607
          public void runInContext() {
608
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) {
1✔
609
              PerfMark.attachTag(tag);
1✔
610
              PerfMark.linkIn(link);
1✔
611
              runInternal();
1✔
612
            }
613
          }
1✔
614

615
          private void runInternal() {
616
            if (exceptionStatus != null) {
1✔
617
              return;
×
618
            }
619
            try {
620
              observer.onHeaders(headers);
1✔
621
            } catch (Throwable t) {
1✔
622
              exceptionThrown(
1✔
623
                  Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
1✔
624
            }
1✔
625
          }
1✔
626
        }
627

628
        callExecutor.execute(new HeadersRead());
1✔
629
      }
630
    }
1✔
631

632
    @Override
633
    public void messagesAvailable(final MessageProducer producer) {
634
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) {
1✔
635
        PerfMark.attachTag(tag);
1✔
636
        final Link link = PerfMark.linkOut();
1✔
637
        final class MessagesAvailable extends ContextRunnable {
638
          MessagesAvailable() {
1✔
639
            super(context);
1✔
640
          }
1✔
641

642
          @Override
643
          public void runInContext() {
644
            try (TaskCloseable ignore =
1✔
645
                     PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) {
1✔
646
              PerfMark.attachTag(tag);
1✔
647
              PerfMark.linkIn(link);
1✔
648
              runInternal();
1✔
649
            }
650
          }
1✔
651

652
          private void runInternal() {
653
            if (exceptionStatus != null) {
1✔
654
              GrpcUtil.closeQuietly(producer);
×
655
              return;
×
656
            }
657
            try {
658
              InputStream message;
659
              while ((message = producer.next()) != null) {
1✔
660
                try {
661
                  observer.onMessage(method.parseResponse(message));
1✔
662
                } catch (Throwable t) {
1✔
663
                  GrpcUtil.closeQuietly(message);
1✔
664
                  throw t;
1✔
665
                }
1✔
666
                message.close();
1✔
667
              }
668
            } catch (Throwable t) {
1✔
669
              GrpcUtil.closeQuietly(producer);
1✔
670
              exceptionThrown(
1✔
671
                  Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
1✔
672
            }
1✔
673
          }
1✔
674
        }
675

676
        callExecutor.execute(new MessagesAvailable());
1✔
677
      }
678
    }
1✔
679

680
    @Override
681
    public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
682
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) {
1✔
683
        PerfMark.attachTag(tag);
1✔
684
        closedInternal(status, rpcProgress, trailers);
1✔
685
      }
686
    }
1✔
687

688
    private void closedInternal(
689
        Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
690
      Deadline deadline = effectiveDeadline();
1✔
691
      if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
1✔
692
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
693
        // description. Since our timer may be delayed in firing, we double-check the deadline and
694
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
695
        if (deadline.isExpired()) {
1✔
696
          status = cancellationHandler.formatDeadlineExceededStatus();
1✔
697
          // Replace trailers to prevent mixing sources of status and trailers.
698
          trailers = new Metadata();
1✔
699
        }
700
      }
701
      final Status savedStatus = status;
1✔
702
      final Metadata savedTrailers = trailers;
1✔
703
      final Link link = PerfMark.linkOut();
1✔
704
      final class StreamClosed extends ContextRunnable {
705
        StreamClosed() {
1✔
706
          super(context);
1✔
707
        }
1✔
708

709
        @Override
710
        public void runInContext() {
711
          try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) {
1✔
712
            PerfMark.attachTag(tag);
1✔
713
            PerfMark.linkIn(link);
1✔
714
            runInternal();
1✔
715
          }
716
        }
1✔
717

718
        private void runInternal() {
719
          cancellationHandler.tearDown();
1✔
720
          Status status = savedStatus;
1✔
721
          Metadata trailers = savedTrailers;
1✔
722
          if (exceptionStatus != null) {
1✔
723
            // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
724
            // However the cancel is racy and this closed() may have already been queued when the
725
            // cancellation occurred. Since other calls like onMessage() will throw away data if
726
            // exceptionStatus != null, it is semantically essential that we _not_ use a status
727
            // provided by the server.
728
            status = exceptionStatus;
1✔
729
            // Replace trailers to prevent mixing sources of status and trailers.
730
            trailers = new Metadata();
1✔
731
          }
732
          try {
733
            closeObserver(observer, status, trailers);
1✔
734
          } finally {
735
            channelCallsTracer.reportCallEnded(status.isOk());
1✔
736
          }
737
        }
1✔
738
      }
739

740
      callExecutor.execute(new StreamClosed());
1✔
741
    }
1✔
742

743
    @Override
744
    public void onReady() {
745
      if (method.getType().clientSendsOneMessage()) {
1✔
746
        return;
1✔
747
      }
748
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) {
1✔
749
        PerfMark.attachTag(tag);
1✔
750
        final Link link = PerfMark.linkOut();
1✔
751

752
        final class StreamOnReady extends ContextRunnable {
753
          StreamOnReady() {
1✔
754
            super(context);
1✔
755
          }
1✔
756

757
          @Override
758
          public void runInContext() {
759
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) {
1✔
760
              PerfMark.attachTag(tag);
1✔
761
              PerfMark.linkIn(link);
1✔
762
              runInternal();
1✔
763
            }
764
          }
1✔
765

766
          private void runInternal() {
767
            if (exceptionStatus != null) {
1✔
768
              return;
×
769
            }
770
            try {
771
              observer.onReady();
1✔
772
            } catch (Throwable t) {
1✔
773
              exceptionThrown(
1✔
774
                  Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
1✔
775
            }
1✔
776
          }
1✔
777
        }
778

779
        callExecutor.execute(new StreamOnReady());
1✔
780
      }
781
    }
1✔
782
  }
783
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc