• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20013

09 Oct 2025 04:10AM UTC coverage: 88.535% (+0.02%) from 88.519%
#20013

push

github

web-flow
README: update remaining references of protoc (#12411)

34648 of 39135 relevant lines covered (88.53%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.22
/../core/src/main/java/io/grpc/internal/AbstractClientStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
22
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
23
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Preconditions;
27
import com.google.common.io.ByteStreams;
28
import io.grpc.Attributes;
29
import io.grpc.CallOptions;
30
import io.grpc.Codec;
31
import io.grpc.Compressor;
32
import io.grpc.Deadline;
33
import io.grpc.Decompressor;
34
import io.grpc.DecompressorRegistry;
35
import io.grpc.Grpc;
36
import io.grpc.Metadata;
37
import io.grpc.Status;
38
import io.grpc.internal.ClientStreamListener.RpcProgress;
39
import java.io.InputStream;
40
import java.util.concurrent.TimeUnit;
41
import java.util.logging.Level;
42
import java.util.logging.Logger;
43
import javax.annotation.Nullable;
44

45
/**
46
 * The abstract base class for {@link ClientStream} implementations. Extending classes only need to
47
 * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called
48
 * from the sending application thread.
49
 */
50
public abstract class AbstractClientStream extends AbstractStream
51
    implements ClientStream, MessageFramer.Sink {
52

53
  private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
1✔
54

55
  /**
56
   * A sink for outbound operations, separated from the stream simply to avoid name
57
   * collisions/confusion. Only called from application thread.
58
   */
59
  protected interface Sink {
60
    /** 
61
     * Sends the request headers to the remote end point.
62
     *
63
     * @param metadata the metadata to be sent
64
     * @param payload the payload needs to be sent in the headers if not null. Should only be used
65
     *     when sending an unary GET request
66
     */
67
    void writeHeaders(Metadata metadata, @Nullable byte[] payload);
68

69
    /**
70
     * Sends an outbound frame to the remote end point.
71
     *
72
     * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code
73
     *     endOfStream} with no data to send
74
     * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
75
     *     {@code true} if this is {@code true}
76
     * @param flush {@code true} if more data may not be arriving soon
77
     * @param numMessages the number of messages this series of frames represents, must be >= 0.
78
     */
79
    void writeFrame(
80
        @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
81

82
    /**
83
     * Tears down the stream, typically in the event of a timeout. This method may be called
84
     * multiple times and from any thread.
85
     *
86
     * <p>This is a clone of {@link ClientStream#cancel(Status)};
87
     * {@link AbstractClientStream#cancel} delegates to this method.
88
     */
89
    void cancel(Status status);
90
  }
91

92
  private final TransportTracer transportTracer;
93
  private final Framer framer;
94
  private final boolean shouldBeCountedForInUse;
95
  private final boolean useGet;
96
  private Metadata headers;
97
  /**
98
   * Whether cancel() has been called. This is not strictly necessary, but removes the delay between
99
   * cancel() being called and isReady() beginning to return false, since cancel is commonly
100
   * processed asynchronously.
101
   */
102
  private volatile boolean cancelled;
103

104
  @SuppressWarnings("this-escape")
105
  protected AbstractClientStream(
106
      WritableBufferAllocator bufferAllocator,
107
      StatsTraceContext statsTraceCtx,
108
      TransportTracer transportTracer,
109
      Metadata headers,
110
      CallOptions callOptions,
111
      boolean useGet) {
1✔
112
    checkNotNull(headers, "headers");
1✔
113
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
114
    this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
1✔
115
    this.useGet = useGet;
1✔
116
    if (!useGet) {
1✔
117
      this.framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
1✔
118
      this.headers = headers;
1✔
119
    } else {
120
      framer = new GetFramer(headers, statsTraceCtx);
1✔
121
    }
122
  }
1✔
123

124
  @Override
125
  public void setDeadline(Deadline deadline) {
126
    headers.discardAll(TIMEOUT_KEY);
1✔
127
    headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
128
  }
1✔
129

130
  @Override
131
  public void setMaxOutboundMessageSize(int maxSize) {
132
    framer.setMaxOutboundMessageSize(maxSize);
1✔
133
  }
1✔
134

135
  @Override
136
  public void setMaxInboundMessageSize(int maxSize) {
137
    transportState().setMaxInboundMessageSize(maxSize);
1✔
138
  }
1✔
139

140
  @Override
141
  public final void setFullStreamDecompression(boolean fullStreamDecompression) {
142
    transportState().setFullStreamDecompression(fullStreamDecompression);
1✔
143
  }
1✔
144

145
  @Override
146
  public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
147
    transportState().setDecompressorRegistry(decompressorRegistry);
1✔
148
  }
1✔
149

150
  /** {@inheritDoc} */
151
  @Override
152
  protected abstract TransportState transportState();
153

154
  @Override
155
  public final void start(ClientStreamListener listener) {
156
    transportState().setListener(listener);
1✔
157
    if (!useGet) {
1✔
158
      abstractClientStreamSink().writeHeaders(headers, null);
1✔
159
      headers = null;
1✔
160
    }
161
  }
1✔
162

163
  /**
164
   * Sink for transport to be called to perform outbound operations. Each stream must have its own
165
   * unique sink.
166
   */
167
  protected abstract Sink abstractClientStreamSink();
168

169
  @Override
170
  protected final Framer framer() {
171
    return framer;
1✔
172
  }
173

174
  /**
175
   * Returns true if this stream should be counted when determining the in-use state of the
176
   * transport.
177
   */
178
  public final boolean shouldBeCountedForInUse() {
179
    return shouldBeCountedForInUse;
1✔
180
  }
181

182
  @Override
183
  public final void deliverFrame(
184
      WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
185
    Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
1✔
186
    abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
1✔
187
  }
1✔
188

189
  @Override
190
  public final void halfClose() {
191
    if (!transportState().isOutboundClosed()) {
1✔
192
      transportState().setOutboundClosed();
1✔
193
      endOfMessages();
1✔
194
    }
195
  }
1✔
196

197
  @Override
198
  public final void cancel(Status reason) {
199
    Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
1✔
200
    cancelled = true;
1✔
201
    abstractClientStreamSink().cancel(reason);
1✔
202
  }
1✔
203

204
  @Override
205
  public final boolean isReady() {
206
    return super.isReady() && !cancelled;
1✔
207
  }
208

209
  @Override
210
  public final void appendTimeoutInsight(InsightBuilder insight) {
211
    Attributes attrs = getAttributes();
1✔
212
    insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
1✔
213
  }
1✔
214

215
  protected TransportTracer getTransportTracer() {
216
    return transportTracer;
1✔
217
  }
218

219
  /** This should only be called from the transport thread. */
220
  protected abstract static class TransportState extends AbstractStream.TransportState {
221
    /** Whether listener.closed() has been called. */
222
    private final StatsTraceContext statsTraceCtx;
223
    private boolean listenerClosed;
224
    private ClientStreamListener listener;
225
    private boolean fullStreamDecompression;
226
    private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
227

228
    private boolean deframerClosed = false;
1✔
229
    private Runnable deframerClosedTask;
230

231
    /** Whether the client has half-closed the stream. */
232
    private volatile boolean outboundClosed;
233

234
    /**
235
     * Whether the stream is closed from the transport's perspective. This can differ from {@link
236
     * #listenerClosed} because there may still be messages buffered to deliver to the application.
237
     */
238
    private boolean statusReported;
239
    /** True if the status reported (set via {@link #transportReportStatus}) is OK. */
240
    private boolean statusReportedIsOk;
241

242
    protected TransportState(
243
        int maxMessageSize,
244
        StatsTraceContext statsTraceCtx,
245
        TransportTracer transportTracer,
246
        CallOptions options) {
247
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
248
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
249
      if (options.getOnReadyThreshold() != null) {
1✔
250
        this.setOnReadyThreshold(options.getOnReadyThreshold());
1✔
251
      }
252
    }
1✔
253

254
    private void setFullStreamDecompression(boolean fullStreamDecompression) {
255
      this.fullStreamDecompression = fullStreamDecompression;
1✔
256
    }
1✔
257

258
    private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
259
      checkState(this.listener == null, "Already called start");
1✔
260
      this.decompressorRegistry =
1✔
261
          checkNotNull(decompressorRegistry, "decompressorRegistry");
1✔
262
    }
1✔
263

264
    @VisibleForTesting
265
    public final void setListener(ClientStreamListener listener) {
266
      checkState(this.listener == null, "Already called setListener");
1✔
267
      this.listener = checkNotNull(listener, "listener");
1✔
268
    }
1✔
269

270
    @Override
271
    public void deframerClosed(boolean hasPartialMessage) {
272
      checkState(statusReported, "status should have been reported on deframer closed");
1✔
273
      deframerClosed = true;
1✔
274
      if (statusReportedIsOk && hasPartialMessage) {
1✔
275
        transportReportStatus(
1✔
276
            Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"),
1✔
277
            true,
278
            new Metadata());
279
      }
280
      if (deframerClosedTask != null) {
1✔
281
        deframerClosedTask.run();
1✔
282
        deframerClosedTask = null;
1✔
283
      }
284
    }
1✔
285

286
    @Override
287
    protected final ClientStreamListener listener() {
288
      return listener;
1✔
289
    }
290

291
    private final void setOutboundClosed() {
292
      outboundClosed = true;
1✔
293
    }
1✔
294

295
    protected final boolean isOutboundClosed() {
296
      return outboundClosed;
1✔
297
    }
298

299
    /**
300
     * Called by transport implementations when they receive headers.
301
     *
302
     * @param headers the parsed headers
303
     */
304
    protected void inboundHeadersReceived(Metadata headers) {
305
      checkState(!statusReported, "Received headers on closed stream");
1✔
306
      statsTraceCtx.clientInboundHeaders(headers);
1✔
307

308
      boolean compressedStream = false;
1✔
309
      String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
1✔
310
      if (fullStreamDecompression && streamEncoding != null) {
1✔
311
        if (streamEncoding.equalsIgnoreCase("gzip")) {
1✔
312
          setFullStreamDecompressor(new GzipInflatingBuffer());
1✔
313
          compressedStream = true;
1✔
314
        } else if (!streamEncoding.equalsIgnoreCase("identity")) {
1✔
315
          deframeFailed(
1✔
316
              Status.INTERNAL
317
                  .withDescription(
1✔
318
                      String.format("Can't find full stream decompressor for %s", streamEncoding))
1✔
319
                  .asRuntimeException());
1✔
320
          return;
1✔
321
        }
322
      }
323

324
      String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
325
      if (messageEncoding != null) {
1✔
326
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
1✔
327
        if (decompressor == null) {
1✔
328
          deframeFailed(
1✔
329
              Status.INTERNAL
330
                  .withDescription(String.format("Can't find decompressor for %s", messageEncoding))
1✔
331
                  .asRuntimeException());
1✔
332
          return;
1✔
333
        } else if (decompressor != Codec.Identity.NONE) {
1✔
334
          if (compressedStream) {
1✔
335
            deframeFailed(
1✔
336
                Status.INTERNAL
337
                    .withDescription("Full stream and gRPC message encoding cannot both be set")
1✔
338
                    .asRuntimeException());
1✔
339
            return;
1✔
340
          }
341
          setDecompressor(decompressor);
1✔
342
        }
343
      }
344

345
      listener().headersRead(headers);
1✔
346
    }
1✔
347

348
    /**
349
     * Processes the contents of a received data frame from the server.
350
     *
351
     * @param frame the received data frame. Its ownership is transferred to this method.
352
     */
353
    protected void inboundDataReceived(ReadableBuffer frame) {
354
      checkNotNull(frame, "frame");
1✔
355
      boolean needToCloseFrame = true;
1✔
356
      try {
357
        if (statusReported) {
1✔
358
          log.log(Level.INFO, "Received data on closed stream");
×
359
          return;
×
360
        }
361

362
        needToCloseFrame = false;
1✔
363
        deframe(frame);
1✔
364
      } finally {
365
        if (needToCloseFrame) {
1✔
366
          frame.close();
×
367
        }
368
      }
369
    }
1✔
370

371
    /**
372
     * Processes the trailers and status from the server.
373
     *
374
     * @param trailers the received trailers
375
     * @param status the status extracted from the trailers
376
     */
377
    protected void inboundTrailersReceived(Metadata trailers, Status status) {
378
      checkNotNull(status, "status");
1✔
379
      checkNotNull(trailers, "trailers");
1✔
380
      if (statusReported) {
1✔
381
        log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
1✔
382
            new Object[]{status, trailers});
383
        return;
1✔
384
      }
385
      statsTraceCtx.clientInboundTrailers(trailers);
1✔
386
      transportReportStatus(status, false, trailers);
1✔
387
    }
1✔
388

389
    /**
390
     * Report stream closure with status to the application layer if not already reported. This
391
     * method must be called from the transport thread.
392
     *
393
     * @param status the new status to set
394
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
395
     *        may already be queued up in the deframer. If {@code false}, the listener will be
396
     *        notified immediately after all currently completed messages in the deframer have been
397
     *        delivered to the application.
398
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
399
     *        server
400
     */
401
    public final void transportReportStatus(final Status status, boolean stopDelivery,
402
        final Metadata trailers) {
403
      transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers);
1✔
404
    }
1✔
405

406
    /**
407
     * Report stream closure with status to the application layer if not already reported. This
408
     * method must be called from the transport thread.
409
     *
410
     * @param status the new status to set
411
     * @param rpcProgress RPC progress that the
412
     *        {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
413
     *        will receive
414
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
415
     *        may already be queued up in the deframer and overrides any previously queued status.
416
     *        If {@code false}, the listener will be notified immediately after all currently
417
     *        completed messages in the deframer have been delivered to the application.
418
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
419
     *        server
420
     */
421
    public final void transportReportStatus(
422
        final Status status,
423
        final RpcProgress rpcProgress,
424
        boolean stopDelivery,
425
        final Metadata trailers) {
426
      checkNotNull(status, "status");
1✔
427
      checkNotNull(trailers, "trailers");
1✔
428
      // If stopDelivery, we continue in case previous invocation is waiting for stall
429
      if (statusReported && !stopDelivery) {
1✔
430
        return;
1✔
431
      }
432
      statusReported = true;
1✔
433
      statusReportedIsOk = status.isOk();
1✔
434
      onStreamDeallocated();
1✔
435

436
      if (deframerClosed) {
1✔
437
        deframerClosedTask = null;
1✔
438
        closeListener(status, rpcProgress, trailers);
1✔
439
      } else {
440
        deframerClosedTask =
1✔
441
            new Runnable() {
1✔
442
              @Override
443
              public void run() {
444
                closeListener(status, rpcProgress, trailers);
1✔
445
              }
1✔
446
            };
447
        closeDeframer(stopDelivery);
1✔
448
      }
449
    }
1✔
450

451
    /**
452
     * Closes the listener if not previously closed.
453
     *
454
     * @throws IllegalStateException if the call has not yet been started.
455
     */
456
    private void closeListener(
457
        Status status, RpcProgress rpcProgress, Metadata trailers) {
458
      if (!listenerClosed) {
1✔
459
        listenerClosed = true;
1✔
460
        statsTraceCtx.streamClosed(status);
1✔
461
        if (getTransportTracer() != null) {
1✔
462
          getTransportTracer().reportStreamClosed(status.isOk());
1✔
463
        }
464
        listener().closed(status, rpcProgress, trailers);
1✔
465
      }
466
    }
1✔
467
  }
468

469
  private class GetFramer implements Framer {
470
    private Metadata headers;
471
    private boolean closed;
472
    private final StatsTraceContext statsTraceCtx;
473
    private byte[] payload;
474

475
    public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
1✔
476
      this.headers = checkNotNull(headers, "headers");
1✔
477
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
478
    }
1✔
479

480
    @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
481
    @Override
482
    public void writePayload(InputStream message) {
483
      checkState(payload == null, "writePayload should not be called multiple times");
1✔
484
      try {
485
        payload = ByteStreams.toByteArray(message);
1✔
486
      } catch (java.io.IOException ex) {
×
487
        throw new RuntimeException(ex);
×
488
      }
1✔
489
      statsTraceCtx.outboundMessage(0);
1✔
490
      statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
1✔
491
      statsTraceCtx.outboundUncompressedSize(payload.length);
1✔
492
      // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
493
      // it using e.g., base64.  However, we are not supposed to know such detail here.
494
      //
495
      // We don't want to move this line to where the encoding happens either, because we'd better
496
      // contain the message stats reporting in Framer as suggested in StatsTraceContext.
497
      // Scattering the reporting sites increases the risk of mis-counting or double-counting.
498
      //
499
      // Because the payload is usually very small, people shouldn't care about the size difference
500
      // caused by encoding.
501
      statsTraceCtx.outboundWireSize(payload.length);
1✔
502
    }
1✔
503

504
    @Override
505
    public void flush() {}
1✔
506

507
    @Override
508
    public boolean isClosed() {
509
      return closed;
1✔
510
    }
511

512
    /** Closes, with flush. */
513
    @Override
514
    public void close() {
515
      closed = true;
1✔
516
      checkState(payload != null,
1✔
517
          "Lack of request message. GET request is only supported for unary requests");
518
      abstractClientStreamSink().writeHeaders(headers, payload);
1✔
519
      payload = null;
1✔
520
      headers = null;
1✔
521
    }
1✔
522

523
    /** Closes, without flush. */
524
    @Override
525
    public void dispose() {
526
      closed = true;
×
527
      payload = null;
×
528
      headers = null;
×
529
    }
×
530

531
    // Compression is not supported for GET encoding.
532
    @Override
533
    public Framer setMessageCompression(boolean enable) {
534
      return this;
×
535
    }
536

537
    @Override
538
    public Framer setCompressor(Compressor compressor) {
539
      return this;
×
540
    }
541

542
    // TODO(zsurocking): support this
543
    @Override
544
    public void setMaxOutboundMessageSize(int maxSize) {}
×
545
  }
546
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc