• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19666

29 Jan 2025 06:14AM UTC coverage: 88.583% (+0.008%) from 88.575%
#19666

Pull #11856

github

web-flow
Merge eadf2e3bc into 9e8629914
Pull Request #11856: netty: Removed 4096 min buffer size

33729 of 38076 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.69
/../core/src/main/java/io/grpc/internal/ClientCallImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
24
import static io.grpc.Contexts.statusFromCancelled;
25
import static io.grpc.Status.DEADLINE_EXCEEDED;
26
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
27
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
28
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
29
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
30
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
31

32
import com.google.common.annotations.VisibleForTesting;
33
import com.google.common.base.MoreObjects;
34
import io.grpc.Attributes;
35
import io.grpc.CallOptions;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.Codec;
39
import io.grpc.Compressor;
40
import io.grpc.CompressorRegistry;
41
import io.grpc.Context;
42
import io.grpc.Context.CancellationListener;
43
import io.grpc.Deadline;
44
import io.grpc.DecompressorRegistry;
45
import io.grpc.InternalConfigSelector;
46
import io.grpc.InternalDecompressorRegistry;
47
import io.grpc.Metadata;
48
import io.grpc.MethodDescriptor;
49
import io.grpc.MethodDescriptor.MethodType;
50
import io.grpc.Status;
51
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
52
import io.perfmark.Link;
53
import io.perfmark.PerfMark;
54
import io.perfmark.Tag;
55
import io.perfmark.TaskCloseable;
56
import java.io.InputStream;
57
import java.nio.charset.Charset;
58
import java.util.Locale;
59
import java.util.concurrent.CancellationException;
60
import java.util.concurrent.Executor;
61
import java.util.concurrent.ScheduledExecutorService;
62
import java.util.concurrent.ScheduledFuture;
63
import java.util.concurrent.TimeUnit;
64
import java.util.concurrent.TimeoutException;
65
import java.util.logging.Level;
66
import java.util.logging.Logger;
67
import javax.annotation.Nullable;
68

69
/**
70
 * Implementation of {@link ClientCall}.
71
 */
72
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
73

74
  private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
1✔
75
  private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
1✔
76
      = "gzip".getBytes(Charset.forName("US-ASCII"));
1✔
77
  private static final double NANO_TO_SECS = 1.0 * TimeUnit.SECONDS.toNanos(1);
1✔
78

79
  private final MethodDescriptor<ReqT, RespT> method;
80
  private final Tag tag;
81
  private final Executor callExecutor;
82
  private final boolean callExecutorIsDirect;
83
  private final CallTracer channelCallsTracer;
84
  private final Context context;
85
  private CancellationHandler cancellationHandler;
86
  private final boolean unaryRequest;
87
  private CallOptions callOptions;
88
  private ClientStream stream;
89
  private boolean cancelCalled;
90
  private boolean halfCloseCalled;
91
  private final ClientStreamProvider clientStreamProvider;
92
  private final ScheduledExecutorService deadlineCancellationExecutor;
93
  private boolean fullStreamDecompression;
94
  private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
95
  private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
1✔
96

97
  ClientCallImpl(
98
      MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
99
      ClientStreamProvider clientStreamProvider,
100
      ScheduledExecutorService deadlineCancellationExecutor,
101
      CallTracer channelCallsTracer,
102
      // TODO(zdapeng): remove this arg
103
      @Nullable InternalConfigSelector configSelector) {
1✔
104
    this.method = method;
1✔
105
    // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
106
    this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
1✔
107
    // If we know that the executor is a direct executor, we don't need to wrap it with a
108
    // SerializingExecutor. This is purely for performance reasons.
109
    // See https://github.com/grpc/grpc-java/issues/368
110
    if (executor == directExecutor()) {
1✔
111
      this.callExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
112
      callExecutorIsDirect = true;
1✔
113
    } else {
114
      this.callExecutor = new SerializingExecutor(executor);
1✔
115
      callExecutorIsDirect = false;
1✔
116
    }
117
    this.channelCallsTracer = channelCallsTracer;
1✔
118
    // Propagate the context from the thread which initiated the call to all callbacks.
119
    this.context = Context.current();
1✔
120
    this.unaryRequest = method.getType() == MethodType.UNARY
1✔
121
        || method.getType() == MethodType.SERVER_STREAMING;
1✔
122
    this.callOptions = callOptions;
1✔
123
    this.clientStreamProvider = clientStreamProvider;
1✔
124
    this.deadlineCancellationExecutor = deadlineCancellationExecutor;
1✔
125
    PerfMark.event("ClientCall.<init>", tag);
1✔
126
  }
1✔
127

128
  /**
129
   * Provider of {@link ClientStream}s.
130
   */
131
  interface ClientStreamProvider {
132
    ClientStream newStream(
133
        MethodDescriptor<?, ?> method,
134
        CallOptions callOptions,
135
        Metadata headers,
136
        Context context);
137
  }
138

139
  ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
140
    this.fullStreamDecompression = fullStreamDecompression;
1✔
141
    return this;
1✔
142
  }
143

144
  ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
145
    this.decompressorRegistry = decompressorRegistry;
1✔
146
    return this;
1✔
147
  }
148

149
  ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
150
    this.compressorRegistry = compressorRegistry;
1✔
151
    return this;
1✔
152
  }
153

154
  @VisibleForTesting
155
  static void prepareHeaders(
156
      Metadata headers,
157
      DecompressorRegistry decompressorRegistry,
158
      Compressor compressor,
159
      boolean fullStreamDecompression) {
160
    headers.discardAll(CONTENT_LENGTH_KEY);
1✔
161
    headers.discardAll(MESSAGE_ENCODING_KEY);
1✔
162
    if (compressor != Codec.Identity.NONE) {
1✔
163
      headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
1✔
164
    }
165

166
    headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
167
    byte[] advertisedEncodings =
1✔
168
        InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
1✔
169
    if (advertisedEncodings.length != 0) {
1✔
170
      headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
1✔
171
    }
172

173
    headers.discardAll(CONTENT_ENCODING_KEY);
1✔
174
    headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
1✔
175
    if (fullStreamDecompression) {
1✔
176
      headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
1✔
177
    }
178
  }
1✔
179

180
  @Override
181
  public void start(Listener<RespT> observer, Metadata headers) {
182
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) {
1✔
183
      PerfMark.attachTag(tag);
1✔
184
      startInternal(observer, headers);
1✔
185
    }
186
  }
1✔
187

188
  private void startInternal(Listener<RespT> observer, Metadata headers) {
189
    checkState(stream == null, "Already started");
1✔
190
    checkState(!cancelCalled, "call was cancelled");
1✔
191
    checkNotNull(observer, "observer");
1✔
192
    checkNotNull(headers, "headers");
1✔
193

194
    if (context.isCancelled()) {
1✔
195
      // Context is already cancelled so no need to create a real stream, just notify the observer
196
      // of cancellation via callback on the executor
197
      stream = NoopClientStream.INSTANCE;
1✔
198
      final Listener<RespT> finalObserver = observer;
1✔
199
      class ClosedByContext extends ContextRunnable {
200
        ClosedByContext() {
1✔
201
          super(context);
1✔
202
        }
1✔
203

204
        @Override
205
        public void runInContext() {
206
          closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
1✔
207
        }
1✔
208
      }
209

210
      callExecutor.execute(new ClosedByContext());
1✔
211
      return;
1✔
212
    }
213
    applyMethodConfig();
1✔
214
    final String compressorName = callOptions.getCompressor();
1✔
215
    Compressor compressor;
216
    if (compressorName != null) {
1✔
217
      compressor = compressorRegistry.lookupCompressor(compressorName);
1✔
218
      if (compressor == null) {
1✔
219
        stream = NoopClientStream.INSTANCE;
×
220
        final Listener<RespT> finalObserver = observer;
×
221
        class ClosedByNotFoundCompressor extends ContextRunnable {
222
          ClosedByNotFoundCompressor() {
×
223
            super(context);
×
224
          }
×
225

226
          @Override
227
          public void runInContext() {
228
            closeObserver(
×
229
                finalObserver,
230
                Status.INTERNAL.withDescription(
×
231
                    String.format("Unable to find compressor by name %s", compressorName)),
×
232
                new Metadata());
233
          }
×
234
        }
235

236
        callExecutor.execute(new ClosedByNotFoundCompressor());
×
237
        return;
×
238
      }
239
    } else {
240
      compressor = Codec.Identity.NONE;
1✔
241
    }
242
    prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
1✔
243

244
    Deadline effectiveDeadline = effectiveDeadline();
1✔
245
    boolean contextIsDeadlineSource = effectiveDeadline != null
1✔
246
        && effectiveDeadline.equals(context.getDeadline());
1✔
247
    cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
1✔
248
    boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
1✔
249
    if (!deadlineExceeded) {
1✔
250
      stream = clientStreamProvider.newStream(method, callOptions, headers, context);
1✔
251
    } else {
252
      ClientStreamTracer[] tracers =
1✔
253
          GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
1✔
254
      String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
1✔
255
      Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
256
      String description = String.format(
1✔
257
          "ClientCall started after %s deadline was exceeded %.9f seconds ago. "
258
              + "Name resolution delay %.9f seconds.", deadlineName,
259
          cancellationHandler.remainingNanos / NANO_TO_SECS,
1✔
260
          nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
1✔
261
      stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
1✔
262
    }
263

264
    if (callExecutorIsDirect) {
1✔
265
      stream.optimizeForDirectExecutor();
1✔
266
    }
267
    if (callOptions.getAuthority() != null) {
1✔
268
      stream.setAuthority(callOptions.getAuthority());
1✔
269
    }
270
    if (callOptions.getMaxInboundMessageSize() != null) {
1✔
271
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
1✔
272
    }
273
    if (callOptions.getMaxOutboundMessageSize() != null) {
1✔
274
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
1✔
275
    }
276
    if (effectiveDeadline != null) {
1✔
277
      stream.setDeadline(effectiveDeadline);
1✔
278
    }
279
    stream.setCompressor(compressor);
1✔
280
    if (fullStreamDecompression) {
1✔
281
      stream.setFullStreamDecompression(fullStreamDecompression);
×
282
    }
283
    stream.setDecompressorRegistry(decompressorRegistry);
1✔
284
    channelCallsTracer.reportCallStarted();
1✔
285
    stream.start(new ClientStreamListenerImpl(observer));
1✔
286

287
    // Delay any sources of cancellation after start(), because most of the transports are broken if
288
    // they receive cancel before start. Issue #1343 has more details
289

290
    // Propagate later Context cancellation to the remote side.
291
    cancellationHandler.setUp();
1✔
292
  }
1✔
293

294
  private void applyMethodConfig() {
295
    MethodInfo info = callOptions.getOption(MethodInfo.KEY);
1✔
296
    if (info == null) {
1✔
297
      return;
1✔
298
    }
299
    if (info.timeoutNanos != null) {
1✔
300
      Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
301
      Deadline existingDeadline = callOptions.getDeadline();
1✔
302
      // If the new deadline is sooner than the existing deadline, swap them.
303
      if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) {
1✔
304
        callOptions = callOptions.withDeadline(newDeadline);
1✔
305
      }
306
    }
307
    if (info.waitForReady != null) {
1✔
308
      callOptions =
1✔
309
          info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady();
1✔
310
    }
311
    if (info.maxInboundMessageSize != null) {
1✔
312
      Integer existingLimit = callOptions.getMaxInboundMessageSize();
×
313
      if (existingLimit != null) {
×
314
        callOptions =
×
315
            callOptions.withMaxInboundMessageSize(
×
316
                Math.min(existingLimit, info.maxInboundMessageSize));
×
317
      } else {
318
        callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize);
×
319
      }
320
    }
321
    if (info.maxOutboundMessageSize != null) {
1✔
322
      Integer existingLimit = callOptions.getMaxOutboundMessageSize();
×
323
      if (existingLimit != null) {
×
324
        callOptions =
×
325
            callOptions.withMaxOutboundMessageSize(
×
326
                Math.min(existingLimit, info.maxOutboundMessageSize));
×
327
      } else {
328
        callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
×
329
      }
330
    }
331
  }
1✔
332

333
  private final class CancellationHandler implements Runnable, CancellationListener {
334
    private final boolean contextIsDeadlineSource;
335
    private final boolean hasDeadline;
336
    private final long remainingNanos;
337
    private volatile ScheduledFuture<?> deadlineCancellationFuture;
338
    private volatile boolean tearDownCalled;
339

340
    CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
1✔
341
      this.contextIsDeadlineSource = contextIsDeadlineSource;
1✔
342
      if (deadline == null) {
1✔
343
        hasDeadline = false;
1✔
344
        remainingNanos = 0;
1✔
345
      } else {
346
        hasDeadline = true;
1✔
347
        remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
348
      }
349
    }
1✔
350

351
    void setUp() {
352
      if (tearDownCalled) {
1✔
353
        return;
1✔
354
      }
355
      if (hasDeadline
1✔
356
          // If the context has the effective deadline, we don't need to schedule an extra task.
357
          && !contextIsDeadlineSource
358
          // If the channel has been terminated, we don't need to schedule an extra task.
359
          && deadlineCancellationExecutor != null) {
1✔
360
        deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
1✔
361
            new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
362
      }
363
      context.addListener(this, directExecutor());
1✔
364
      if (tearDownCalled) {
1✔
365
        // Race detected! Re-run to make sure the future is cancelled and context listener removed
366
        tearDown();
×
367
      }
368
    }
1✔
369

370
    // May be called multiple times, and race with setUp()
371
    void tearDown() {
372
      tearDownCalled = true;
1✔
373
      ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
1✔
374
      if (deadlineCancellationFuture != null) {
1✔
375
        deadlineCancellationFuture.cancel(false);
1✔
376
      }
377
      context.removeListener(this);
1✔
378
    }
1✔
379

380
    @Override
381
    public void cancelled(Context context) {
382
      if (hasDeadline && contextIsDeadlineSource
1✔
383
          && context.cancellationCause() instanceof TimeoutException) {
1✔
384
        stream.cancel(formatDeadlineExceededStatus());
1✔
385
        return;
1✔
386
      }
387
      stream.cancel(statusFromCancelled(context));
1✔
388
    }
1✔
389

390
    @Override
391
    public void run() {
392
      stream.cancel(formatDeadlineExceededStatus());
1✔
393
    }
1✔
394

395
    Status formatDeadlineExceededStatus() {
396
      // DelayedStream.cancel() is safe to call from a thread that is different from where the
397
      // stream is created.
398
      long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
399
      long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
400

401
      StringBuilder buf = new StringBuilder();
1✔
402
      buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
1✔
403
      buf.append(" deadline exceeded after ");
1✔
404
      if (remainingNanos < 0) {
1✔
405
        buf.append('-');
1✔
406
      }
407
      buf.append(seconds);
1✔
408
      buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
409
      buf.append("s. ");
1✔
410
      Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
411
      buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
1✔
412
          nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
1✔
413
      if (stream != null) {
1✔
414
        InsightBuilder insight = new InsightBuilder();
1✔
415
        stream.appendTimeoutInsight(insight);
1✔
416
        buf.append(" ");
1✔
417
        buf.append(insight);
1✔
418
      }
419
      return DEADLINE_EXCEEDED.withDescription(buf.toString());
1✔
420
    }
421
  }
422

423
  @Nullable
424
  private Deadline effectiveDeadline() {
425
    // Call options and context are immutable, so we don't need to cache the deadline.
426
    return min(callOptions.getDeadline(), context.getDeadline());
1✔
427
  }
428

429
  @Nullable
430
  private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
431
    if (deadline0 == null) {
1✔
432
      return deadline1;
1✔
433
    }
434
    if (deadline1 == null) {
1✔
435
      return deadline0;
1✔
436
    }
437
    return deadline0.minimum(deadline1);
1✔
438
  }
439

440
  @Override
441
  public void request(int numMessages) {
442
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
1✔
443
      PerfMark.attachTag(tag);
1✔
444
      checkState(stream != null, "Not started");
1✔
445
      checkArgument(numMessages >= 0, "Number requested must be non-negative");
1✔
446
      stream.request(numMessages);
1✔
447
    }
448
  }
1✔
449

450
  @Override
451
  public void cancel(@Nullable String message, @Nullable Throwable cause) {
452
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) {
1✔
453
      PerfMark.attachTag(tag);
1✔
454
      cancelInternal(message, cause);
1✔
455
    }
456
  }
1✔
457

458
  private void cancelInternal(@Nullable String message, @Nullable Throwable cause) {
459
    if (message == null && cause == null) {
1✔
460
      cause = new CancellationException("Cancelled without a message or cause");
×
461
      log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
×
462
    }
463
    if (cancelCalled) {
1✔
464
      return;
1✔
465
    }
466
    cancelCalled = true;
1✔
467
    try {
468
      // Cancel is called in exception handling cases, so it may be the case that the
469
      // stream was never successfully created or start has never been called.
470
      if (stream != null) {
1✔
471
        Status status = Status.CANCELLED;
1✔
472
        if (message != null) {
1✔
473
          status = status.withDescription(message);
1✔
474
        } else {
475
          status = status.withDescription("Call cancelled without message");
1✔
476
        }
477
        if (cause != null) {
1✔
478
          status = status.withCause(cause);
1✔
479
        }
480
        stream.cancel(status);
1✔
481
      }
482
    } finally {
483
      // start() might not have been called
484
      if (cancellationHandler != null) {
1✔
485
        cancellationHandler.tearDown();
1✔
486
      }
487
    }
488
  }
1✔
489

490
  @Override
491
  public void halfClose() {
492
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) {
1✔
493
      PerfMark.attachTag(tag);
1✔
494
      halfCloseInternal();
1✔
495
    }
496
  }
1✔
497

498
  private void halfCloseInternal() {
499
    checkState(stream != null, "Not started");
1✔
500
    checkState(!cancelCalled, "call was cancelled");
1✔
501
    checkState(!halfCloseCalled, "call already half-closed");
1✔
502
    halfCloseCalled = true;
1✔
503
    stream.halfClose();
1✔
504
  }
1✔
505

506
  @Override
507
  public void sendMessage(ReqT message) {
508
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) {
1✔
509
      PerfMark.attachTag(tag);
1✔
510
      sendMessageInternal(message);
1✔
511
    }
512
  }
1✔
513

514
  private void sendMessageInternal(ReqT message) {
515
    checkState(stream != null, "Not started");
1✔
516
    checkState(!cancelCalled, "call was cancelled");
1✔
517
    checkState(!halfCloseCalled, "call was half-closed");
1✔
518
    try {
519
      if (stream instanceof RetriableStream) {
1✔
520
        @SuppressWarnings("unchecked")
521
        RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream;
1✔
522
        retriableStream.sendMessage(message);
1✔
523
      } else {
1✔
524
        stream.writeMessage(method.streamRequest(message));
1✔
525
      }
526
    } catch (RuntimeException e) {
1✔
527
      stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
1✔
528
      return;
1✔
529
    } catch (Error e) {
×
530
      stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
×
531
      throw e;
×
532
    }
1✔
533
    // For unary requests, we don't flush since we know that halfClose should be coming soon. This
534
    // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
535
    // possibility of broken applications forgetting to call halfClose without noticing.
536
    if (!unaryRequest) {
1✔
537
      stream.flush();
1✔
538
    }
539
  }
1✔
540

541
  @Override
542
  public void setMessageCompression(boolean enabled) {
543
    checkState(stream != null, "Not started");
1✔
544
    stream.setMessageCompression(enabled);
1✔
545
  }
1✔
546

547
  @Override
548
  public boolean isReady() {
549
    if (halfCloseCalled) {
1✔
550
      return false;
1✔
551
    }
552
    return stream.isReady();
1✔
553
  }
554

555
  @Override
556
  public Attributes getAttributes() {
557
    if (stream != null) {
1✔
558
      return stream.getAttributes();
1✔
559
    }
560
    return Attributes.EMPTY;
1✔
561
  }
562

563
  private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
564
    observer.onClose(status, trailers);
1✔
565
  }
1✔
566

567
  @Override
568
  public String toString() {
569
    return MoreObjects.toStringHelper(this).add("method", method).toString();
×
570
  }
571

572
  private class ClientStreamListenerImpl implements ClientStreamListener {
573
    private final Listener<RespT> observer;
574
    private Status exceptionStatus;
575

576
    public ClientStreamListenerImpl(Listener<RespT> observer) {
1✔
577
      this.observer = checkNotNull(observer, "observer");
1✔
578
    }
1✔
579

580
    /**
581
     * Cancels call and schedules onClose() notification. May only be called from the application
582
     * thread.
583
     */
584
    private void exceptionThrown(Status status) {
585
      // Since each RPC can have its own executor, we can only call onClose() when we are sure there
586
      // will be no further callbacks. We set the status here and overwrite the onClose() details
587
      // when it arrives.
588
      exceptionStatus = status;
1✔
589
      stream.cancel(status);
1✔
590
    }
1✔
591

592
    @Override
593
    public void headersRead(final Metadata headers) {
594
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) {
1✔
595
        PerfMark.attachTag(tag);
1✔
596
        final Link link = PerfMark.linkOut();
1✔
597
        final class HeadersRead extends ContextRunnable {
598
          HeadersRead() {
1✔
599
            super(context);
1✔
600
          }
1✔
601

602
          @Override
603
          public void runInContext() {
604
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) {
1✔
605
              PerfMark.attachTag(tag);
1✔
606
              PerfMark.linkIn(link);
1✔
607
              runInternal();
1✔
608
            }
609
          }
1✔
610

611
          private void runInternal() {
612
            if (exceptionStatus != null) {
1✔
613
              return;
×
614
            }
615
            try {
616
              observer.onHeaders(headers);
1✔
617
            } catch (Throwable t) {
1✔
618
              exceptionThrown(
1✔
619
                  Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
1✔
620
            }
1✔
621
          }
1✔
622
        }
623

624
        callExecutor.execute(new HeadersRead());
1✔
625
      }
626
    }
1✔
627

628
    @Override
629
    public void messagesAvailable(final MessageProducer producer) {
630
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) {
1✔
631
        PerfMark.attachTag(tag);
1✔
632
        final Link link = PerfMark.linkOut();
1✔
633
        final class MessagesAvailable extends ContextRunnable {
634
          MessagesAvailable() {
1✔
635
            super(context);
1✔
636
          }
1✔
637

638
          @Override
639
          public void runInContext() {
640
            try (TaskCloseable ignore =
1✔
641
                     PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) {
1✔
642
              PerfMark.attachTag(tag);
1✔
643
              PerfMark.linkIn(link);
1✔
644
              runInternal();
1✔
645
            }
646
          }
1✔
647

648
          private void runInternal() {
649
            if (exceptionStatus != null) {
1✔
650
              GrpcUtil.closeQuietly(producer);
×
651
              return;
×
652
            }
653
            try {
654
              InputStream message;
655
              while ((message = producer.next()) != null) {
1✔
656
                try {
657
                  observer.onMessage(method.parseResponse(message));
1✔
658
                } catch (Throwable t) {
1✔
659
                  GrpcUtil.closeQuietly(message);
1✔
660
                  throw t;
1✔
661
                }
1✔
662
                message.close();
1✔
663
              }
664
            } catch (Throwable t) {
1✔
665
              GrpcUtil.closeQuietly(producer);
1✔
666
              exceptionThrown(
1✔
667
                  Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
1✔
668
            }
1✔
669
          }
1✔
670
        }
671

672
        callExecutor.execute(new MessagesAvailable());
1✔
673
      }
674
    }
1✔
675

676
    @Override
677
    public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
678
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) {
1✔
679
        PerfMark.attachTag(tag);
1✔
680
        closedInternal(status, rpcProgress, trailers);
1✔
681
      }
682
    }
1✔
683

684
    private void closedInternal(
685
        Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
686
      Deadline deadline = effectiveDeadline();
1✔
687
      if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
1✔
688
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
689
        // description. Since our timer may be delayed in firing, we double-check the deadline and
690
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
691
        if (deadline.isExpired()) {
1✔
692
          status = cancellationHandler.formatDeadlineExceededStatus();
1✔
693
          // Replace trailers to prevent mixing sources of status and trailers.
694
          trailers = new Metadata();
1✔
695
        }
696
      }
697
      final Status savedStatus = status;
1✔
698
      final Metadata savedTrailers = trailers;
1✔
699
      final Link link = PerfMark.linkOut();
1✔
700
      final class StreamClosed extends ContextRunnable {
701
        StreamClosed() {
1✔
702
          super(context);
1✔
703
        }
1✔
704

705
        @Override
706
        public void runInContext() {
707
          try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) {
1✔
708
            PerfMark.attachTag(tag);
1✔
709
            PerfMark.linkIn(link);
1✔
710
            runInternal();
1✔
711
          }
712
        }
1✔
713

714
        private void runInternal() {
715
          cancellationHandler.tearDown();
1✔
716
          Status status = savedStatus;
1✔
717
          Metadata trailers = savedTrailers;
1✔
718
          if (exceptionStatus != null) {
1✔
719
            // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
720
            // However the cancel is racy and this closed() may have already been queued when the
721
            // cancellation occurred. Since other calls like onMessage() will throw away data if
722
            // exceptionStatus != null, it is semantically essential that we _not_ use a status
723
            // provided by the server.
724
            status = exceptionStatus;
1✔
725
            // Replace trailers to prevent mixing sources of status and trailers.
726
            trailers = new Metadata();
1✔
727
          }
728
          try {
729
            closeObserver(observer, status, trailers);
1✔
730
          } finally {
731
            channelCallsTracer.reportCallEnded(status.isOk());
1✔
732
          }
733
        }
1✔
734
      }
735

736
      callExecutor.execute(new StreamClosed());
1✔
737
    }
1✔
738

739
    @Override
740
    public void onReady() {
741
      if (method.getType().clientSendsOneMessage()) {
1✔
742
        return;
1✔
743
      }
744
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) {
1✔
745
        PerfMark.attachTag(tag);
1✔
746
        final Link link = PerfMark.linkOut();
1✔
747

748
        final class StreamOnReady extends ContextRunnable {
749
          StreamOnReady() {
1✔
750
            super(context);
1✔
751
          }
1✔
752

753
          @Override
754
          public void runInContext() {
755
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) {
1✔
756
              PerfMark.attachTag(tag);
1✔
757
              PerfMark.linkIn(link);
1✔
758
              runInternal();
1✔
759
            }
760
          }
1✔
761

762
          private void runInternal() {
763
            if (exceptionStatus != null) {
1✔
764
              return;
×
765
            }
766
            try {
767
              observer.onReady();
1✔
768
            } catch (Throwable t) {
1✔
769
              exceptionThrown(
1✔
770
                  Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
1✔
771
            }
1✔
772
          }
1✔
773
        }
774

775
        callExecutor.execute(new StreamOnReady());
1✔
776
      }
777
    }
1✔
778
  }
779
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc