• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20005

01 Oct 2025 02:05AM UTC coverage: 88.585% (+0.003%) from 88.582%
#20005

push

github

web-flow
netty: Unconditionally disable adaptive cumulator (#12390)

io.netty.util.Version is unreliable, so we stop using it. grpc-netty and
grpc-netty-shaded have their version.properties mix, and you can't tell
which is which.

Changed the tests to use assume, so it is clear in the results that they
weren't run.

34859 of 39351 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.22
/../core/src/main/java/io/grpc/internal/AbstractClientStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
22
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
23
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Preconditions;
27
import com.google.common.io.ByteStreams;
28
import io.grpc.Attributes;
29
import io.grpc.CallOptions;
30
import io.grpc.Codec;
31
import io.grpc.Compressor;
32
import io.grpc.Deadline;
33
import io.grpc.Decompressor;
34
import io.grpc.DecompressorRegistry;
35
import io.grpc.Grpc;
36
import io.grpc.Metadata;
37
import io.grpc.Status;
38
import io.grpc.internal.ClientStreamListener.RpcProgress;
39
import java.io.InputStream;
40
import java.util.concurrent.TimeUnit;
41
import java.util.logging.Level;
42
import java.util.logging.Logger;
43
import javax.annotation.Nullable;
44

45
/**
46
 * The abstract base class for {@link ClientStream} implementations. Extending classes only need to
47
 * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called
48
 * from the sending application thread.
49
 */
50
public abstract class AbstractClientStream extends AbstractStream
51
    implements ClientStream, MessageFramer.Sink {
52

53
  private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
1✔
54

55
  /**
56
   * A sink for outbound operations, separated from the stream simply to avoid name
57
   * collisions/confusion. Only called from application thread.
58
   */
59
  protected interface Sink {
60
    /** 
61
     * Sends the request headers to the remote end point.
62
     *
63
     * @param metadata the metadata to be sent
64
     * @param payload the payload needs to be sent in the headers if not null. Should only be used
65
     *     when sending an unary GET request
66
     */
67
    void writeHeaders(Metadata metadata, @Nullable byte[] payload);
68

69
    /**
70
     * Sends an outbound frame to the remote end point.
71
     *
72
     * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code
73
     *     endOfStream} with no data to send
74
     * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
75
     *     {@code true} if this is {@code true}
76
     * @param flush {@code true} if more data may not be arriving soon
77
     * @param numMessages the number of messages this series of frames represents, must be >= 0.
78
     */
79
    void writeFrame(
80
        @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
81

82
    /**
83
     * Tears down the stream, typically in the event of a timeout. This method may be called
84
     * multiple times and from any thread.
85
     *
86
     * <p>This is a clone of {@link ClientStream#cancel(Status)};
87
     * {@link AbstractClientStream#cancel} delegates to this method.
88
     */
89
    void cancel(Status status);
90
  }
91

92
  private final TransportTracer transportTracer;
93
  private final Framer framer;
94
  private final boolean shouldBeCountedForInUse;
95
  private final boolean useGet;
96
  private Metadata headers;
97
  /**
98
   * Whether cancel() has been called. This is not strictly necessary, but removes the delay between
99
   * cancel() being called and isReady() beginning to return false, since cancel is commonly
100
   * processed asynchronously.
101
   */
102
  private volatile boolean cancelled;
103

104
  @SuppressWarnings("this-escape")
105
  protected AbstractClientStream(
106
      WritableBufferAllocator bufferAllocator,
107
      StatsTraceContext statsTraceCtx,
108
      TransportTracer transportTracer,
109
      Metadata headers,
110
      CallOptions callOptions,
111
      boolean useGet) {
1✔
112
    checkNotNull(headers, "headers");
1✔
113
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
114
    this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
1✔
115
    this.useGet = useGet;
1✔
116
    if (!useGet) {
1✔
117
      this.framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
1✔
118
      this.headers = headers;
1✔
119
    } else {
120
      framer = new GetFramer(headers, statsTraceCtx);
1✔
121
    }
122
  }
1✔
123

124
  @Override
125
  public void setDeadline(Deadline deadline) {
126
    headers.discardAll(TIMEOUT_KEY);
1✔
127
    headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
128
  }
1✔
129

130
  @Override
131
  public void setMaxOutboundMessageSize(int maxSize) {
132
    framer.setMaxOutboundMessageSize(maxSize);
1✔
133
  }
1✔
134

135
  @Override
136
  public void setMaxInboundMessageSize(int maxSize) {
137
    transportState().setMaxInboundMessageSize(maxSize);
1✔
138
  }
1✔
139

140
  @Override
141
  public final void setFullStreamDecompression(boolean fullStreamDecompression) {
142
    transportState().setFullStreamDecompression(fullStreamDecompression);
1✔
143
  }
1✔
144

145
  @Override
146
  public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
147
    transportState().setDecompressorRegistry(decompressorRegistry);
1✔
148
  }
1✔
149

150
  /** {@inheritDoc} */
151
  @Override
152
  protected abstract TransportState transportState();
153

154
  @Override
155
  public final void start(ClientStreamListener listener) {
156
    transportState().setListener(listener);
1✔
157
    if (!useGet) {
1✔
158
      abstractClientStreamSink().writeHeaders(headers, null);
1✔
159
      headers = null;
1✔
160
    }
161
  }
1✔
162

163
  /**
164
   * Sink for transport to be called to perform outbound operations. Each stream must have its own
165
   * unique sink.
166
   */
167
  protected abstract Sink abstractClientStreamSink();
168

169
  @Override
170
  protected final Framer framer() {
171
    return framer;
1✔
172
  }
173

174
  /**
175
   * Returns true if this stream should be counted when determining the in-use state of the
176
   * transport.
177
   */
178
  public final boolean shouldBeCountedForInUse() {
179
    return shouldBeCountedForInUse;
1✔
180
  }
181

182
  @Override
183
  public final void deliverFrame(
184
      WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
185
    Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
1✔
186
    abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
1✔
187
  }
1✔
188

189
  @Override
190
  public final void halfClose() {
191
    if (!transportState().isOutboundClosed()) {
1✔
192
      transportState().setOutboundClosed();
1✔
193
      endOfMessages();
1✔
194
    }
195
  }
1✔
196

197
  @Override
198
  public final void cancel(Status reason) {
199
    Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
1✔
200
    cancelled = true;
1✔
201
    abstractClientStreamSink().cancel(reason);
1✔
202
  }
1✔
203

204
  @Override
205
  public final boolean isReady() {
206
    return super.isReady() && !cancelled;
1✔
207
  }
208

209
  @Override
210
  public final void appendTimeoutInsight(InsightBuilder insight) {
211
    Attributes attrs = getAttributes();
1✔
212
    insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
1✔
213
  }
1✔
214

215
  protected TransportTracer getTransportTracer() {
216
    return transportTracer;
1✔
217
  }
218

219
  /** This should only be called from the transport thread. */
220
  protected abstract static class TransportState extends AbstractStream.TransportState {
221
    /** Whether listener.closed() has been called. */
222
    private final StatsTraceContext statsTraceCtx;
223
    private boolean listenerClosed;
224
    private ClientStreamListener listener;
225
    private boolean fullStreamDecompression;
226
    private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
227

228
    private boolean deframerClosed = false;
1✔
229
    private Runnable deframerClosedTask;
230

231
    /** Whether the client has half-closed the stream. */
232
    private volatile boolean outboundClosed;
233

234
    /**
235
     * Whether the stream is closed from the transport's perspective. This can differ from {@link
236
     * #listenerClosed} because there may still be messages buffered to deliver to the application.
237
     */
238
    private boolean statusReported;
239
    /** True if the status reported (set via {@link #transportReportStatus}) is OK. */
240
    private boolean statusReportedIsOk;
241

242
    protected TransportState(
243
        int maxMessageSize,
244
        StatsTraceContext statsTraceCtx,
245
        TransportTracer transportTracer,
246
        CallOptions options) {
247
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
248
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
249
      if (options.getOnReadyThreshold() != null) {
1✔
250
        this.setOnReadyThreshold(options.getOnReadyThreshold());
1✔
251
      }
252
    }
1✔
253

254
    private void setFullStreamDecompression(boolean fullStreamDecompression) {
255
      this.fullStreamDecompression = fullStreamDecompression;
1✔
256
    }
1✔
257

258
    private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
259
      checkState(this.listener == null, "Already called start");
1✔
260
      this.decompressorRegistry =
1✔
261
          checkNotNull(decompressorRegistry, "decompressorRegistry");
1✔
262
    }
1✔
263

264
    @VisibleForTesting
265
    public final void setListener(ClientStreamListener listener) {
266
      checkState(this.listener == null, "Already called setListener");
1✔
267
      this.listener = checkNotNull(listener, "listener");
1✔
268
    }
1✔
269

270
    @Override
271
    public void deframerClosed(boolean hasPartialMessage) {
272
      checkState(statusReported, "status should have been reported on deframer closed");
1✔
273
      deframerClosed = true;
1✔
274
      if (statusReportedIsOk && hasPartialMessage) {
1✔
275
        transportReportStatus(
1✔
276
            Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"),
1✔
277
            true,
278
            new Metadata());
279
      }
280
      if (deframerClosedTask != null) {
1✔
281
        deframerClosedTask.run();
1✔
282
        deframerClosedTask = null;
1✔
283
      }
284
    }
1✔
285

286
    @Override
287
    protected final ClientStreamListener listener() {
288
      return listener;
1✔
289
    }
290

291
    private final void setOutboundClosed() {
292
      outboundClosed = true;
1✔
293
    }
1✔
294

295
    protected final boolean isOutboundClosed() {
296
      return outboundClosed;
1✔
297
    }
298

299
    /**
300
     * Called by transport implementations when they receive headers.
301
     *
302
     * @param headers the parsed headers
303
     */
304
    protected void inboundHeadersReceived(Metadata headers) {
305
      checkState(!statusReported, "Received headers on closed stream");
1✔
306
      statsTraceCtx.clientInboundHeaders(headers);
1✔
307

308
      boolean compressedStream = false;
1✔
309
      String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
1✔
310
      if (fullStreamDecompression && streamEncoding != null) {
1✔
311
        if (streamEncoding.equalsIgnoreCase("gzip")) {
1✔
312
          setFullStreamDecompressor(new GzipInflatingBuffer());
1✔
313
          compressedStream = true;
1✔
314
        } else if (!streamEncoding.equalsIgnoreCase("identity")) {
1✔
315
          deframeFailed(
1✔
316
              Status.INTERNAL
317
                  .withDescription(
1✔
318
                      String.format("Can't find full stream decompressor for %s", streamEncoding))
1✔
319
                  .asRuntimeException());
1✔
320
          return;
1✔
321
        }
322
      }
323

324
      String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
325
      if (messageEncoding != null) {
1✔
326
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
1✔
327
        if (decompressor == null) {
1✔
328
          deframeFailed(
1✔
329
              Status.INTERNAL
330
                  .withDescription(String.format("Can't find decompressor for %s", messageEncoding))
1✔
331
                  .asRuntimeException());
1✔
332
          return;
1✔
333
        } else if (decompressor != Codec.Identity.NONE) {
1✔
334
          if (compressedStream) {
1✔
335
            deframeFailed(
1✔
336
                Status.INTERNAL
337
                    .withDescription("Full stream and gRPC message encoding cannot both be set")
1✔
338
                    .asRuntimeException());
1✔
339
            return;
1✔
340
          }
341
          setDecompressor(decompressor);
1✔
342
        }
343
      }
344

345
      listener().headersRead(headers);
1✔
346
    }
1✔
347

348
    /**
349
     * Processes the contents of a received data frame from the server.
350
     *
351
     * @param frame the received data frame. Its ownership is transferred to this method.
352
     */
353
    protected void inboundDataReceived(ReadableBuffer frame) {
354
      checkNotNull(frame, "frame");
1✔
355
      boolean needToCloseFrame = true;
1✔
356
      try {
357
        if (statusReported) {
1✔
358
          log.log(Level.INFO, "Received data on closed stream");
×
359
          return;
×
360
        }
361

362
        needToCloseFrame = false;
1✔
363
        deframe(frame);
1✔
364
      } finally {
365
        if (needToCloseFrame) {
1✔
366
          frame.close();
×
367
        }
368
      }
369
    }
1✔
370

371
    /**
372
     * Processes the trailers and status from the server.
373
     *
374
     * @param trailers the received trailers
375
     * @param status the status extracted from the trailers
376
     */
377
    protected void inboundTrailersReceived(Metadata trailers, Status status) {
378
      checkNotNull(status, "status");
1✔
379
      checkNotNull(trailers, "trailers");
1✔
380
      if (statusReported) {
1✔
381
        log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
1✔
382
            new Object[]{status, trailers});
383
        return;
1✔
384
      }
385
      statsTraceCtx.clientInboundTrailers(trailers);
1✔
386
      transportReportStatus(status, false, trailers);
1✔
387
    }
1✔
388

389
    /**
390
     * Report stream closure with status to the application layer if not already reported. This
391
     * method must be called from the transport thread.
392
     *
393
     * @param status the new status to set
394
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
395
     *        may already be queued up in the deframer. If {@code false}, the listener will be
396
     *        notified immediately after all currently completed messages in the deframer have been
397
     *        delivered to the application.
398
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
399
     *        server
400
     */
401
    public final void transportReportStatus(final Status status, boolean stopDelivery,
402
        final Metadata trailers) {
403
      transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers);
1✔
404
    }
1✔
405

406
    /**
407
     * Report stream closure with status to the application layer if not already reported. This
408
     * method must be called from the transport thread.
409
     *
410
     * @param status the new status to set
411
     * @param rpcProgress RPC progress that the
412
     *        {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
413
     *        will receive
414
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
415
     *        may already be queued up in the deframer and overrides any previously queued status.
416
     *        If {@code false}, the listener will be notified immediately after all currently
417
     *        completed messages in the deframer have been delivered to the application.
418
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
419
     *        server
420
     */
421
    public final void transportReportStatus(
422
        final Status status,
423
        final RpcProgress rpcProgress,
424
        boolean stopDelivery,
425
        final Metadata trailers) {
426
      checkNotNull(status, "status");
1✔
427
      checkNotNull(trailers, "trailers");
1✔
428
      // If stopDelivery, we continue in case previous invocation is waiting for stall
429
      if (statusReported && !stopDelivery) {
1✔
430
        return;
1✔
431
      }
432
      statusReported = true;
1✔
433
      statusReportedIsOk = status.isOk();
1✔
434
      onStreamDeallocated();
1✔
435

436
      if (deframerClosed) {
1✔
437
        deframerClosedTask = null;
1✔
438
        closeListener(status, rpcProgress, trailers);
1✔
439
      } else {
440
        deframerClosedTask =
1✔
441
            new Runnable() {
1✔
442
              @Override
443
              public void run() {
444
                closeListener(status, rpcProgress, trailers);
1✔
445
              }
1✔
446
            };
447
        closeDeframer(stopDelivery);
1✔
448
      }
449
    }
1✔
450

451
    /**
452
     * Closes the listener if not previously closed.
453
     *
454
     * @throws IllegalStateException if the call has not yet been started.
455
     */
456
    private void closeListener(
457
        Status status, RpcProgress rpcProgress, Metadata trailers) {
458
      if (!listenerClosed) {
1✔
459
        listenerClosed = true;
1✔
460
        statsTraceCtx.streamClosed(status);
1✔
461
        if (getTransportTracer() != null) {
1✔
462
          getTransportTracer().reportStreamClosed(status.isOk());
1✔
463
        }
464
        listener().closed(status, rpcProgress, trailers);
1✔
465
      }
466
    }
1✔
467
  }
468

469
  private class GetFramer implements Framer {
470
    private Metadata headers;
471
    private boolean closed;
472
    private final StatsTraceContext statsTraceCtx;
473
    private byte[] payload;
474

475
    public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
1✔
476
      this.headers = checkNotNull(headers, "headers");
1✔
477
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
478
    }
1✔
479

480
    @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
481
    @Override
482
    public void writePayload(InputStream message) {
483
      checkState(payload == null, "writePayload should not be called multiple times");
1✔
484
      try {
485
        payload = ByteStreams.toByteArray(message);
1✔
486
      } catch (java.io.IOException ex) {
×
487
        throw new RuntimeException(ex);
×
488
      }
1✔
489
      statsTraceCtx.outboundMessage(0);
1✔
490
      statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
1✔
491
      statsTraceCtx.outboundUncompressedSize(payload.length);
1✔
492
      // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
493
      // it using e.g., base64.  However, we are not supposed to know such detail here.
494
      //
495
      // We don't want to move this line to where the encoding happens either, because we'd better
496
      // contain the message stats reporting in Framer as suggested in StatsTraceContext.
497
      // Scattering the reporting sites increases the risk of mis-counting or double-counting.
498
      //
499
      // Because the payload is usually very small, people shouldn't care about the size difference
500
      // caused by encoding.
501
      statsTraceCtx.outboundWireSize(payload.length);
1✔
502
    }
1✔
503

504
    @Override
505
    public void flush() {}
1✔
506

507
    @Override
508
    public boolean isClosed() {
509
      return closed;
1✔
510
    }
511

512
    /** Closes, with flush. */
513
    @Override
514
    public void close() {
515
      closed = true;
1✔
516
      checkState(payload != null,
1✔
517
          "Lack of request message. GET request is only supported for unary requests");
518
      abstractClientStreamSink().writeHeaders(headers, payload);
1✔
519
      payload = null;
1✔
520
      headers = null;
1✔
521
    }
1✔
522

523
    /** Closes, without flush. */
524
    @Override
525
    public void dispose() {
526
      closed = true;
×
527
      payload = null;
×
528
      headers = null;
×
529
    }
×
530

531
    // Compression is not supported for GET encoding.
532
    @Override
533
    public Framer setMessageCompression(boolean enable) {
534
      return this;
×
535
    }
536

537
    @Override
538
    public Framer setCompressor(Compressor compressor) {
539
      return this;
×
540
    }
541

542
    // TODO(zsurocking): support this
543
    @Override
544
    public void setMaxOutboundMessageSize(int maxSize) {}
×
545
  }
546
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc