• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19448

05 Sep 2024 09:39PM UTC coverage: 84.506% (-0.008%) from 84.514%
#19448

push

github

ejona86
core: touch() buffer when detach()ing

Detachable lets a buffer outlive its original lifetime. The new lifetime
is application-controlled. If the application fails to read/close the
stream, then the leak detector wouldn't make clear what code was
responsible for the buffer's lifetime. With this touch, we'll be able to
see detach() was called and thus know the application needs debugging.

Realized when looking at b/364531464, although I think the issue is
unrelated.

33533 of 39681 relevant lines covered (84.51%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.69
/../core/src/main/java/io/grpc/internal/ClientCallImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
24
import static io.grpc.Contexts.statusFromCancelled;
25
import static io.grpc.Status.DEADLINE_EXCEEDED;
26
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
27
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
28
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
29
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
30
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
31

32
import com.google.common.annotations.VisibleForTesting;
33
import com.google.common.base.MoreObjects;
34
import io.grpc.Attributes;
35
import io.grpc.CallOptions;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.Codec;
39
import io.grpc.Compressor;
40
import io.grpc.CompressorRegistry;
41
import io.grpc.Context;
42
import io.grpc.Context.CancellationListener;
43
import io.grpc.Deadline;
44
import io.grpc.DecompressorRegistry;
45
import io.grpc.InternalConfigSelector;
46
import io.grpc.InternalDecompressorRegistry;
47
import io.grpc.Metadata;
48
import io.grpc.MethodDescriptor;
49
import io.grpc.MethodDescriptor.MethodType;
50
import io.grpc.Status;
51
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
52
import io.perfmark.Link;
53
import io.perfmark.PerfMark;
54
import io.perfmark.Tag;
55
import io.perfmark.TaskCloseable;
56
import java.io.InputStream;
57
import java.nio.charset.Charset;
58
import java.util.Locale;
59
import java.util.concurrent.CancellationException;
60
import java.util.concurrent.Executor;
61
import java.util.concurrent.ScheduledExecutorService;
62
import java.util.concurrent.ScheduledFuture;
63
import java.util.concurrent.TimeUnit;
64
import java.util.concurrent.TimeoutException;
65
import java.util.logging.Level;
66
import java.util.logging.Logger;
67
import javax.annotation.Nullable;
68

69
/**
70
 * Implementation of {@link ClientCall}.
71
 */
72
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
73

74
  private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
1✔
75
  private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
1✔
76
      = "gzip".getBytes(Charset.forName("US-ASCII"));
1✔
77
  private static final double NANO_TO_SECS = 1.0 * TimeUnit.SECONDS.toNanos(1);
1✔
78

79
  private final MethodDescriptor<ReqT, RespT> method;
80
  private final Tag tag;
81
  private final Executor callExecutor;
82
  private final boolean callExecutorIsDirect;
83
  private final CallTracer channelCallsTracer;
84
  private final Context context;
85
  private CancellationHandler cancellationHandler;
86
  private final boolean unaryRequest;
87
  private CallOptions callOptions;
88
  private ClientStream stream;
89
  private boolean cancelCalled;
90
  private boolean halfCloseCalled;
91
  private final ClientStreamProvider clientStreamProvider;
92
  private final ScheduledExecutorService deadlineCancellationExecutor;
93
  private boolean fullStreamDecompression;
94
  private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
95
  private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
1✔
96

97
  ClientCallImpl(
98
      MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
99
      ClientStreamProvider clientStreamProvider,
100
      ScheduledExecutorService deadlineCancellationExecutor,
101
      CallTracer channelCallsTracer,
102
      // TODO(zdapeng): remove this arg
103
      @Nullable InternalConfigSelector configSelector) {
1✔
104
    this.method = method;
1✔
105
    // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
106
    this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
1✔
107
    // If we know that the executor is a direct executor, we don't need to wrap it with a
108
    // SerializingExecutor. This is purely for performance reasons.
109
    // See https://github.com/grpc/grpc-java/issues/368
110
    if (executor == directExecutor()) {
1✔
111
      this.callExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
112
      callExecutorIsDirect = true;
1✔
113
    } else {
114
      this.callExecutor = new SerializingExecutor(executor);
1✔
115
      callExecutorIsDirect = false;
1✔
116
    }
117
    this.channelCallsTracer = channelCallsTracer;
1✔
118
    // Propagate the context from the thread which initiated the call to all callbacks.
119
    this.context = Context.current();
1✔
120
    this.unaryRequest = method.getType() == MethodType.UNARY
1✔
121
        || method.getType() == MethodType.SERVER_STREAMING;
1✔
122
    this.callOptions = callOptions;
1✔
123
    this.clientStreamProvider = clientStreamProvider;
1✔
124
    this.deadlineCancellationExecutor = deadlineCancellationExecutor;
1✔
125
    PerfMark.event("ClientCall.<init>", tag);
1✔
126
  }
1✔
127

128
  /**
129
   * Provider of {@link ClientStream}s.
130
   */
131
  interface ClientStreamProvider {
132
    ClientStream newStream(
133
        MethodDescriptor<?, ?> method,
134
        CallOptions callOptions,
135
        Metadata headers,
136
        Context context);
137
  }
138

139
  ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
140
    this.fullStreamDecompression = fullStreamDecompression;
1✔
141
    return this;
1✔
142
  }
143

144
  ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
145
    this.decompressorRegistry = decompressorRegistry;
1✔
146
    return this;
1✔
147
  }
148

149
  ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
150
    this.compressorRegistry = compressorRegistry;
1✔
151
    return this;
1✔
152
  }
153

154
  @VisibleForTesting
155
  static void prepareHeaders(
156
      Metadata headers,
157
      DecompressorRegistry decompressorRegistry,
158
      Compressor compressor,
159
      boolean fullStreamDecompression) {
160
    headers.discardAll(CONTENT_LENGTH_KEY);
1✔
161
    headers.discardAll(MESSAGE_ENCODING_KEY);
1✔
162
    if (compressor != Codec.Identity.NONE) {
1✔
163
      headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
1✔
164
    }
165

166
    headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
167
    byte[] advertisedEncodings =
1✔
168
        InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
1✔
169
    if (advertisedEncodings.length != 0) {
1✔
170
      headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
1✔
171
    }
172

173
    headers.discardAll(CONTENT_ENCODING_KEY);
1✔
174
    headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
1✔
175
    if (fullStreamDecompression) {
1✔
176
      headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
1✔
177
    }
178
  }
1✔
179

180
  @Override
181
  public void start(Listener<RespT> observer, Metadata headers) {
182
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) {
1✔
183
      PerfMark.attachTag(tag);
1✔
184
      startInternal(observer, headers);
1✔
185
    }
186
  }
1✔
187

188
  private void startInternal(Listener<RespT> observer, Metadata headers) {
189
    checkState(stream == null, "Already started");
1✔
190
    checkState(!cancelCalled, "call was cancelled");
1✔
191
    checkNotNull(observer, "observer");
1✔
192
    checkNotNull(headers, "headers");
1✔
193

194
    if (context.isCancelled()) {
1✔
195
      // Context is already cancelled so no need to create a real stream, just notify the observer
196
      // of cancellation via callback on the executor
197
      stream = NoopClientStream.INSTANCE;
1✔
198
      final Listener<RespT> finalObserver = observer;
1✔
199
      class ClosedByContext extends ContextRunnable {
200
        ClosedByContext() {
1✔
201
          super(context);
1✔
202
        }
1✔
203

204
        @Override
205
        public void runInContext() {
206
          closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
1✔
207
        }
1✔
208
      }
209

210
      callExecutor.execute(new ClosedByContext());
1✔
211
      return;
1✔
212
    }
213
    applyMethodConfig();
1✔
214
    final String compressorName = callOptions.getCompressor();
1✔
215
    Compressor compressor;
216
    if (compressorName != null) {
1✔
217
      compressor = compressorRegistry.lookupCompressor(compressorName);
1✔
218
      if (compressor == null) {
1✔
219
        stream = NoopClientStream.INSTANCE;
×
220
        final Listener<RespT> finalObserver = observer;
×
221
        class ClosedByNotFoundCompressor extends ContextRunnable {
222
          ClosedByNotFoundCompressor() {
×
223
            super(context);
×
224
          }
×
225

226
          @Override
227
          public void runInContext() {
228
            closeObserver(
×
229
                finalObserver,
230
                Status.INTERNAL.withDescription(
×
231
                    String.format("Unable to find compressor by name %s", compressorName)),
×
232
                new Metadata());
233
          }
×
234
        }
235

236
        callExecutor.execute(new ClosedByNotFoundCompressor());
×
237
        return;
×
238
      }
239
    } else {
240
      compressor = Codec.Identity.NONE;
1✔
241
    }
242
    prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
1✔
243

244
    Deadline effectiveDeadline = effectiveDeadline();
1✔
245
    boolean contextIsDeadlineSource = effectiveDeadline != null
1✔
246
        && effectiveDeadline.equals(context.getDeadline());
1✔
247
    cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
1✔
248
    boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
1✔
249
    if (!deadlineExceeded) {
1✔
250
      stream = clientStreamProvider.newStream(method, callOptions, headers, context);
1✔
251
    } else {
252
      ClientStreamTracer[] tracers =
1✔
253
          GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
1✔
254
      String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
1✔
255
      Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
256
      String description = String.format(
1✔
257
          "ClientCall started after %s deadline was exceeded %.9f seconds ago. "
258
              + "Name resolution delay %.9f seconds.", deadlineName,
259
          cancellationHandler.remainingNanos / NANO_TO_SECS,
1✔
260
          nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
1✔
261
      stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
1✔
262
    }
263

264
    if (callExecutorIsDirect) {
1✔
265
      stream.optimizeForDirectExecutor();
1✔
266
    }
267
    if (callOptions.getAuthority() != null) {
1✔
268
      stream.setAuthority(callOptions.getAuthority());
1✔
269
    }
270
    if (callOptions.getMaxInboundMessageSize() != null) {
1✔
271
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
1✔
272
    }
273
    if (callOptions.getMaxOutboundMessageSize() != null) {
1✔
274
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
1✔
275
    }
276
    if (effectiveDeadline != null) {
1✔
277
      stream.setDeadline(effectiveDeadline);
1✔
278
    }
279
    stream.setCompressor(compressor);
1✔
280
    if (fullStreamDecompression) {
1✔
281
      stream.setFullStreamDecompression(fullStreamDecompression);
×
282
    }
283
    stream.setDecompressorRegistry(decompressorRegistry);
1✔
284
    channelCallsTracer.reportCallStarted();
1✔
285
    stream.start(new ClientStreamListenerImpl(observer));
1✔
286

287
    // Delay any sources of cancellation after start(), because most of the transports are broken if
288
    // they receive cancel before start. Issue #1343 has more details
289

290
    // Propagate later Context cancellation to the remote side.
291
    cancellationHandler.setUp();
1✔
292
  }
1✔
293

294
  private void applyMethodConfig() {
295
    MethodInfo info = callOptions.getOption(MethodInfo.KEY);
1✔
296
    if (info == null) {
1✔
297
      return;
1✔
298
    }
299
    if (info.timeoutNanos != null) {
1✔
300
      Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
301
      Deadline existingDeadline = callOptions.getDeadline();
1✔
302
      // If the new deadline is sooner than the existing deadline, swap them.
303
      if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) {
1✔
304
        callOptions = callOptions.withDeadline(newDeadline);
1✔
305
      }
306
    }
307
    if (info.waitForReady != null) {
1✔
308
      callOptions =
1✔
309
          info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady();
1✔
310
    }
311
    if (info.maxInboundMessageSize != null) {
1✔
312
      Integer existingLimit = callOptions.getMaxInboundMessageSize();
×
313
      if (existingLimit != null) {
×
314
        callOptions =
×
315
            callOptions.withMaxInboundMessageSize(
×
316
                Math.min(existingLimit, info.maxInboundMessageSize));
×
317
      } else {
318
        callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize);
×
319
      }
320
    }
321
    if (info.maxOutboundMessageSize != null) {
1✔
322
      Integer existingLimit = callOptions.getMaxOutboundMessageSize();
×
323
      if (existingLimit != null) {
×
324
        callOptions =
×
325
            callOptions.withMaxOutboundMessageSize(
×
326
                Math.min(existingLimit, info.maxOutboundMessageSize));
×
327
      } else {
328
        callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
×
329
      }
330
    }
331
  }
1✔
332

333
  private final class CancellationHandler implements Runnable, CancellationListener {
334
    private final boolean contextIsDeadlineSource;
335
    private final boolean hasDeadline;
336
    private final long remainingNanos;
337
    private volatile ScheduledFuture<?> deadlineCancellationFuture;
338
    private volatile boolean tearDownCalled;
339

340
    CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
1✔
341
      this.contextIsDeadlineSource = contextIsDeadlineSource;
1✔
342
      if (deadline == null) {
1✔
343
        hasDeadline = false;
1✔
344
        remainingNanos = 0;
1✔
345
      } else {
346
        hasDeadline = true;
1✔
347
        remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
348
      }
349
    }
1✔
350

351
    void setUp() {
352
      if (tearDownCalled) {
1✔
353
        return;
1✔
354
      }
355
      if (hasDeadline
1✔
356
          // If the context has the effective deadline, we don't need to schedule an extra task.
357
          && !contextIsDeadlineSource
358
          // If the channel has been terminated, we don't need to schedule an extra task.
359
          && deadlineCancellationExecutor != null) {
1✔
360
        deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
1✔
361
            new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
362
      }
363
      context.addListener(this, directExecutor());
1✔
364
      if (tearDownCalled) {
1✔
365
        // Race detected! Re-run to make sure the future is cancelled and context listener removed
366
        tearDown();
×
367
      }
368
    }
1✔
369

370
    // May be called multiple times, and race with setUp()
371
    void tearDown() {
372
      tearDownCalled = true;
1✔
373
      ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
1✔
374
      if (deadlineCancellationFuture != null) {
1✔
375
        deadlineCancellationFuture.cancel(false);
1✔
376
      }
377
      context.removeListener(this);
1✔
378
    }
1✔
379

380
    @Override
381
    public void cancelled(Context context) {
382
      if (hasDeadline && contextIsDeadlineSource
1✔
383
          && context.cancellationCause() instanceof TimeoutException) {
1✔
384
        stream.cancel(formatDeadlineExceededStatus());
1✔
385
        return;
1✔
386
      }
387
      stream.cancel(statusFromCancelled(context));
1✔
388
    }
1✔
389

390
    @Override
391
    public void run() {
392
      stream.cancel(formatDeadlineExceededStatus());
1✔
393
    }
1✔
394

395
    Status formatDeadlineExceededStatus() {
396
      // DelayedStream.cancel() is safe to call from a thread that is different from where the
397
      // stream is created.
398
      long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
399
      long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
400

401
      StringBuilder buf = new StringBuilder();
1✔
402
      buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
1✔
403
      buf.append(" deadline exceeded after ");
1✔
404
      if (remainingNanos < 0) {
1✔
405
        buf.append('-');
1✔
406
      }
407
      buf.append(seconds);
1✔
408
      buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
409
      buf.append("s. ");
1✔
410
      Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
1✔
411
      buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
1✔
412
          nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
1✔
413
      if (stream != null) {
1✔
414
        InsightBuilder insight = new InsightBuilder();
1✔
415
        stream.appendTimeoutInsight(insight);
1✔
416
        buf.append(" ");
1✔
417
        buf.append(insight);
1✔
418
      }
419
      return DEADLINE_EXCEEDED.withDescription(buf.toString());
1✔
420
    }
421
  }
422

423
  @Nullable
424
  private Deadline effectiveDeadline() {
425
    // Call options and context are immutable, so we don't need to cache the deadline.
426
    return min(callOptions.getDeadline(), context.getDeadline());
1✔
427
  }
428

429
  @Nullable
430
  private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
431
    if (deadline0 == null) {
1✔
432
      return deadline1;
1✔
433
    }
434
    if (deadline1 == null) {
1✔
435
      return deadline0;
1✔
436
    }
437
    return deadline0.minimum(deadline1);
1✔
438
  }
439

440
  @Override
441
  public void request(int numMessages) {
442
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
1✔
443
      PerfMark.attachTag(tag);
1✔
444
      checkState(stream != null, "Not started");
1✔
445
      checkArgument(numMessages >= 0, "Number requested must be non-negative");
1✔
446
      stream.request(numMessages);
1✔
447
    }
448
  }
1✔
449

450
  @Override
451
  public void cancel(@Nullable String message, @Nullable Throwable cause) {
452
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) {
1✔
453
      PerfMark.attachTag(tag);
1✔
454
      cancelInternal(message, cause);
1✔
455
    }
456
  }
1✔
457

458
  private void cancelInternal(@Nullable String message, @Nullable Throwable cause) {
459
    if (message == null && cause == null) {
1✔
460
      cause = new CancellationException("Cancelled without a message or cause");
×
461
      log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
×
462
    }
463
    if (cancelCalled) {
1✔
464
      return;
1✔
465
    }
466
    cancelCalled = true;
1✔
467
    try {
468
      // Cancel is called in exception handling cases, so it may be the case that the
469
      // stream was never successfully created or start has never been called.
470
      if (stream != null) {
1✔
471
        Status status = Status.CANCELLED;
1✔
472
        if (message != null) {
1✔
473
          status = status.withDescription(message);
1✔
474
        } else {
475
          status = status.withDescription("Call cancelled without message");
1✔
476
        }
477
        if (cause != null) {
1✔
478
          status = status.withCause(cause);
1✔
479
        }
480
        stream.cancel(status);
1✔
481
      }
482
    } finally {
483
      // start() might not have been called
484
      if (cancellationHandler != null) {
1✔
485
        cancellationHandler.tearDown();
1✔
486
      }
487
    }
488
  }
1✔
489

490
  @Override
491
  public void halfClose() {
492
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) {
1✔
493
      PerfMark.attachTag(tag);
1✔
494
      halfCloseInternal();
1✔
495
    }
496
  }
1✔
497

498
  private void halfCloseInternal() {
499
    checkState(stream != null, "Not started");
1✔
500
    checkState(!cancelCalled, "call was cancelled");
1✔
501
    checkState(!halfCloseCalled, "call already half-closed");
1✔
502
    halfCloseCalled = true;
1✔
503
    stream.halfClose();
1✔
504
  }
1✔
505

506
  @Override
507
  public void sendMessage(ReqT message) {
508
    try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) {
1✔
509
      PerfMark.attachTag(tag);
1✔
510
      sendMessageInternal(message);
1✔
511
    }
512
  }
1✔
513

514
  private void sendMessageInternal(ReqT message) {
515
    checkState(stream != null, "Not started");
1✔
516
    checkState(!cancelCalled, "call was cancelled");
1✔
517
    checkState(!halfCloseCalled, "call was half-closed");
1✔
518
    try {
519
      if (stream instanceof RetriableStream) {
1✔
520
        @SuppressWarnings("unchecked")
521
        RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream;
1✔
522
        retriableStream.sendMessage(message);
1✔
523
      } else {
1✔
524
        stream.writeMessage(method.streamRequest(message));
1✔
525
      }
526
    } catch (RuntimeException e) {
1✔
527
      stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
1✔
528
      return;
1✔
529
    } catch (Error e) {
×
530
      stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
×
531
      throw e;
×
532
    }
1✔
533
    // For unary requests, we don't flush since we know that halfClose should be coming soon. This
534
    // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
535
    // possibility of broken applications forgetting to call halfClose without noticing.
536
    if (!unaryRequest) {
1✔
537
      stream.flush();
1✔
538
    }
539
  }
1✔
540

541
  @Override
542
  public void setMessageCompression(boolean enabled) {
543
    checkState(stream != null, "Not started");
1✔
544
    stream.setMessageCompression(enabled);
1✔
545
  }
1✔
546

547
  @Override
548
  public boolean isReady() {
549
    if (halfCloseCalled) {
1✔
550
      return false;
1✔
551
    }
552
    return stream.isReady();
1✔
553
  }
554

555
  @Override
556
  public Attributes getAttributes() {
557
    if (stream != null) {
1✔
558
      return stream.getAttributes();
1✔
559
    }
560
    return Attributes.EMPTY;
1✔
561
  }
562

563
  private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
564
    observer.onClose(status, trailers);
1✔
565
  }
1✔
566

567
  @Override
568
  public String toString() {
569
    return MoreObjects.toStringHelper(this).add("method", method).toString();
×
570
  }
571

572
  private class ClientStreamListenerImpl implements ClientStreamListener {
573
    private final Listener<RespT> observer;
574
    private Status exceptionStatus;
575

576
    public ClientStreamListenerImpl(Listener<RespT> observer) {
1✔
577
      this.observer = checkNotNull(observer, "observer");
1✔
578
    }
1✔
579

580
    /**
581
     * Cancels call and schedules onClose() notification. May only be called from the application
582
     * thread.
583
     */
584
    private void exceptionThrown(Status status) {
585
      // Since each RPC can have its own executor, we can only call onClose() when we are sure there
586
      // will be no further callbacks. We set the status here and overwrite the onClose() details
587
      // when it arrives.
588
      exceptionStatus = status;
1✔
589
      stream.cancel(status);
1✔
590
    }
1✔
591

592
    @Override
593
    public void headersRead(final Metadata headers) {
594
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) {
1✔
595
        PerfMark.attachTag(tag);
1✔
596
        final Link link = PerfMark.linkOut();
1✔
597
        final class HeadersRead extends ContextRunnable {
598
          HeadersRead() {
1✔
599
            super(context);
1✔
600
          }
1✔
601

602
          @Override
603
          public void runInContext() {
604
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) {
1✔
605
              PerfMark.attachTag(tag);
1✔
606
              PerfMark.linkIn(link);
1✔
607
              runInternal();
1✔
608
            }
609
          }
1✔
610

611
          private void runInternal() {
612
            if (exceptionStatus != null) {
1✔
613
              return;
×
614
            }
615
            try {
616
              observer.onHeaders(headers);
1✔
617
            } catch (Throwable t) {
1✔
618
              exceptionThrown(
1✔
619
                  Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
1✔
620
            }
1✔
621
          }
1✔
622
        }
623

624
        callExecutor.execute(new HeadersRead());
1✔
625
      }
626
    }
1✔
627

628
    @Override
629
    public void messagesAvailable(final MessageProducer producer) {
630
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) {
1✔
631
        PerfMark.attachTag(tag);
1✔
632
        final Link link = PerfMark.linkOut();
1✔
633
        final class MessagesAvailable extends ContextRunnable {
634
          MessagesAvailable() {
1✔
635
            super(context);
1✔
636
          }
1✔
637

638
          @Override
639
          public void runInContext() {
640
            try (TaskCloseable ignore =
1✔
641
                     PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) {
1✔
642
              PerfMark.attachTag(tag);
1✔
643
              PerfMark.linkIn(link);
1✔
644
              runInternal();
1✔
645
            }
646
          }
1✔
647

648
          private void runInternal() {
649
            if (exceptionStatus != null) {
1✔
650
              GrpcUtil.closeQuietly(producer);
×
651
              return;
×
652
            }
653
            try {
654
              InputStream message;
655
              while ((message = producer.next()) != null) {
1✔
656
                try {
657
                  observer.onMessage(method.parseResponse(message));
1✔
658
                } catch (Throwable t) {
1✔
659
                  GrpcUtil.closeQuietly(message);
1✔
660
                  throw t;
1✔
661
                }
1✔
662
                message.close();
1✔
663
              }
664
            } catch (Throwable t) {
1✔
665
              GrpcUtil.closeQuietly(producer);
1✔
666
              exceptionThrown(
1✔
667
                  Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
1✔
668
            }
1✔
669
          }
1✔
670
        }
671

672
        callExecutor.execute(new MessagesAvailable());
1✔
673
      }
674
    }
1✔
675

676
    @Override
677
    public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
678
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) {
1✔
679
        PerfMark.attachTag(tag);
1✔
680
        closedInternal(status, rpcProgress, trailers);
1✔
681
      }
682
    }
1✔
683

684
    private void closedInternal(
685
        Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
686
      Deadline deadline = effectiveDeadline();
1✔
687
      if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
1✔
688
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
689
        // description. Since our timer may be delayed in firing, we double-check the deadline and
690
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
691
        if (deadline.isExpired()) {
1✔
692
          status = cancellationHandler.formatDeadlineExceededStatus();
1✔
693
          // Replace trailers to prevent mixing sources of status and trailers.
694
          trailers = new Metadata();
1✔
695
        }
696
      }
697
      final Status savedStatus = status;
1✔
698
      final Metadata savedTrailers = trailers;
1✔
699
      final Link link = PerfMark.linkOut();
1✔
700
      final class StreamClosed extends ContextRunnable {
701
        StreamClosed() {
1✔
702
          super(context);
1✔
703
        }
1✔
704

705
        @Override
706
        public void runInContext() {
707
          try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) {
1✔
708
            PerfMark.attachTag(tag);
1✔
709
            PerfMark.linkIn(link);
1✔
710
            runInternal();
1✔
711
          }
712
        }
1✔
713

714
        private void runInternal() {
715
          cancellationHandler.tearDown();
1✔
716
          Status status = savedStatus;
1✔
717
          Metadata trailers = savedTrailers;
1✔
718
          if (exceptionStatus != null) {
1✔
719
            // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
720
            // However the cancel is racy and this closed() may have already been queued when the
721
            // cancellation occurred. Since other calls like onMessage() will throw away data if
722
            // exceptionStatus != null, it is semantically essential that we _not_ use a status
723
            // provided by the server.
724
            status = exceptionStatus;
1✔
725
            // Replace trailers to prevent mixing sources of status and trailers.
726
            trailers = new Metadata();
1✔
727
          }
728
          try {
729
            closeObserver(observer, status, trailers);
1✔
730
          } finally {
731
            channelCallsTracer.reportCallEnded(status.isOk());
1✔
732
          }
733
        }
1✔
734
      }
735

736
      callExecutor.execute(new StreamClosed());
1✔
737
    }
1✔
738

739
    @Override
740
    public void onReady() {
741
      if (method.getType().clientSendsOneMessage()) {
1✔
742
        return;
1✔
743
      }
744
      try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) {
1✔
745
        PerfMark.attachTag(tag);
1✔
746
        final Link link = PerfMark.linkOut();
1✔
747

748
        final class StreamOnReady extends ContextRunnable {
749
          StreamOnReady() {
1✔
750
            super(context);
1✔
751
          }
1✔
752

753
          @Override
754
          public void runInContext() {
755
            try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) {
1✔
756
              PerfMark.attachTag(tag);
1✔
757
              PerfMark.linkIn(link);
1✔
758
              runInternal();
1✔
759
            }
760
          }
1✔
761

762
          private void runInternal() {
763
            if (exceptionStatus != null) {
1✔
764
              return;
×
765
            }
766
            try {
767
              observer.onReady();
1✔
768
            } catch (Throwable t) {
1✔
769
              exceptionThrown(
1✔
770
                  Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
1✔
771
            }
1✔
772
          }
1✔
773
        }
774

775
        callExecutor.execute(new StreamOnReady());
1✔
776
      }
777
    }
1✔
778
  }
779
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc