• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19108

21 Mar 2024 10:37PM UTC coverage: 88.277% (-0.002%) from 88.279%
#19108

push

github

web-flow
Allow configuration of the queued byte threshold at which a Stream is considered not ready (#10977)

* Allow the queued byte threshold for a Stream to be ready to be configurable

- on clients this is exposed by setting a CallOption
- on servers this is configured by calling a method on ServerCall or ServerStreamListener

31190 of 35332 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.74
/../core/src/main/java/io/grpc/internal/Http2ClientStreamTransportState.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import com.google.common.base.Charsets;
20
import com.google.common.base.Preconditions;
21
import io.grpc.CallOptions;
22
import io.grpc.InternalMetadata;
23
import io.grpc.InternalStatus;
24
import io.grpc.Metadata;
25
import io.grpc.Status;
26
import java.nio.charset.Charset;
27
import javax.annotation.Nullable;
28

29
/**
30
 * Base implementation for client streams using HTTP2 as the transport.
31
 */
32
public abstract class Http2ClientStreamTransportState extends AbstractClientStream.TransportState {
33

34
  /**
35
   * Metadata marshaller for HTTP status lines.
36
   */
37
  private static final InternalMetadata.TrustedAsciiMarshaller<Integer> HTTP_STATUS_MARSHALLER =
1✔
38
      new InternalMetadata.TrustedAsciiMarshaller<Integer>() {
1✔
39
        @Override
40
        public byte[] toAsciiString(Integer value) {
41
          throw new UnsupportedOperationException();
×
42
        }
43

44
        /**
45
         * RFC 7231 says status codes are 3 digits long.
46
         *
47
         * @see <a href="https://tools.ietf.org/html/rfc7231#section-6">RFC 7231</a>
48
         */
49
        @Override
50
        public Integer parseAsciiString(byte[] serialized) {
51
          if (serialized.length >= 3) {
1✔
52
            return (serialized[0] - '0') * 100 + (serialized[1] - '0') * 10 + (serialized[2] - '0');
1✔
53
          }
54
          throw new NumberFormatException(
×
55
              "Malformed status code " + new String(serialized, InternalMetadata.US_ASCII));
56
        }
57
      };
58

59
  private static final Metadata.Key<Integer> HTTP2_STATUS = InternalMetadata.keyOf(":status",
1✔
60
      HTTP_STATUS_MARSHALLER);
61

62
  /** When non-{@code null}, {@link #transportErrorMetadata} must also be non-{@code null}. */
63
  private Status transportError;
64
  private Metadata transportErrorMetadata;
65
  private Charset errorCharset = Charsets.UTF_8;
1✔
66
  private boolean headersReceived;
67

68
  protected Http2ClientStreamTransportState(
69
      int maxMessageSize,
70
      StatsTraceContext statsTraceCtx,
71
      TransportTracer transportTracer,
72
      CallOptions options) {
73
    super(maxMessageSize, statsTraceCtx, transportTracer, options);
1✔
74
  }
1✔
75

76
  /**
77
   * Called to process a failure in HTTP/2 processing. It should notify the transport to cancel the
78
   * stream and call {@code transportReportStatus()}.
79
   */
80
  protected abstract void http2ProcessingFailed(
81
      Status status, boolean stopDelivery, Metadata trailers);
82

83
  /**
84
   * Called by subclasses whenever {@code Headers} are received from the transport.
85
   *
86
   * @param headers the received headers
87
   */
88
  protected void transportHeadersReceived(Metadata headers) {
89
    Preconditions.checkNotNull(headers, "headers");
1✔
90
    if (transportError != null) {
1✔
91
      // Already received a transport error so just augment it. Something is really, really strange.
92
      transportError = transportError.augmentDescription("headers: " + headers);
1✔
93
      return;
1✔
94
    }
95
    try {
96
      if (headersReceived) {
1✔
97
        transportError = Status.INTERNAL.withDescription("Received headers twice");
1✔
98
        return;
1✔
99
      }
100
      Integer httpStatus = headers.get(HTTP2_STATUS);
1✔
101
      if (httpStatus != null && httpStatus >= 100 && httpStatus < 200) {
1✔
102
        // Ignore the headers. See RFC 7540 ยง8.1
103
        return;
1✔
104
      }
105
      headersReceived = true;
1✔
106

107
      transportError = validateInitialMetadata(headers);
1✔
108
      if (transportError != null) {
1✔
109
        return;
1✔
110
      }
111

112
      stripTransportDetails(headers);
1✔
113
      inboundHeadersReceived(headers);
1✔
114
    } finally {
115
      if (transportError != null) {
1✔
116
        // Note we don't immediately report the transport error, instead we wait for more data on
117
        // the stream so we can accumulate more detail into the error before reporting it.
118
        transportError = transportError.augmentDescription("headers: " + headers);
1✔
119
        transportErrorMetadata = headers;
1✔
120
        errorCharset = extractCharset(headers);
1✔
121
      }
122
    }
123
  }
1✔
124

125
  /**
126
   * Called by subclasses whenever a data frame is received from the transport.
127
   *
128
   * @param frame the received data frame
129
   * @param endOfStream {@code true} if there will be no more data received for this stream
130
   */
131
  protected void transportDataReceived(ReadableBuffer frame, boolean endOfStream) {
132
    if (transportError != null) {
1✔
133
      // We've already detected a transport error and now we're just accumulating more detail
134
      // for it.
135
      transportError = transportError.augmentDescription("DATA-----------------------------\n"
1✔
136
          + ReadableBuffers.readAsString(frame, errorCharset));
1✔
137
      frame.close();
1✔
138
      if (transportError.getDescription().length() > 1000 || endOfStream) {
1✔
139
        http2ProcessingFailed(transportError, false, transportErrorMetadata);
1✔
140
      }
141
    } else {
142
      if (!headersReceived) {
1✔
143
        http2ProcessingFailed(
1✔
144
            Status.INTERNAL.withDescription("headers not received before payload"),
1✔
145
            false,
146
            new Metadata());
147
        return;
1✔
148
      }
149
      int frameSize = frame.readableBytes();
1✔
150
      inboundDataReceived(frame);
1✔
151
      if (endOfStream) {
1✔
152
        // This is a protocol violation as we expect to receive trailers.
153
        if (frameSize > 0) {
1✔
154
          transportError = Status.INTERNAL
1✔
155
              .withDescription("Received unexpected EOS on non-empty DATA frame from server");
1✔
156
        } else {
157
          transportError = Status.INTERNAL
1✔
158
              .withDescription("Received unexpected EOS on empty DATA frame from server");
1✔
159
        }
160
        transportErrorMetadata = new Metadata();
1✔
161
        transportReportStatus(transportError, false, transportErrorMetadata);
1✔
162
      }
163
    }
164
  }
1✔
165

166
  /**
167
   * Called by subclasses for the terminal trailer metadata on a stream.
168
   *
169
   * @param trailers the received terminal trailer metadata
170
   */
171
  protected void transportTrailersReceived(Metadata trailers) {
172
    Preconditions.checkNotNull(trailers, "trailers");
1✔
173
    if (transportError == null && !headersReceived) {
1✔
174
      transportError = validateInitialMetadata(trailers);
1✔
175
      if (transportError != null) {
1✔
176
        transportErrorMetadata = trailers;
1✔
177
      }
178
    }
179
    if (transportError != null) {
1✔
180
      transportError = transportError.augmentDescription("trailers: " + trailers);
1✔
181
      http2ProcessingFailed(transportError, false, transportErrorMetadata);
1✔
182
    } else {
183
      Status status = statusFromTrailers(trailers);
1✔
184
      stripTransportDetails(trailers);
1✔
185
      inboundTrailersReceived(trailers, status);
1✔
186
    }
187
  }
1✔
188

189
  /**
190
   * Extract the response status from trailers.
191
   */
192
  private Status statusFromTrailers(Metadata trailers) {
193
    Status status = trailers.get(InternalStatus.CODE_KEY);
1✔
194
    if (status != null) {
1✔
195
      return status.withDescription(trailers.get(InternalStatus.MESSAGE_KEY));
1✔
196
    }
197
    // No status; something is broken. Try to provide a resonanable error.
198
    if (headersReceived) {
1✔
199
      return Status.UNKNOWN.withDescription("missing GRPC status in response");
1✔
200
    }
201
    Integer httpStatus = trailers.get(HTTP2_STATUS);
1✔
202
    if (httpStatus != null) {
1✔
203
      status = GrpcUtil.httpStatusToGrpcStatus(httpStatus);
1✔
204
    } else {
205
      status = Status.INTERNAL.withDescription("missing HTTP status code");
×
206
    }
207
    return status.augmentDescription(
1✔
208
        "missing GRPC status, inferred error from HTTP status code");
209
  }
210

211
  /**
212
   * Inspect initial headers to make sure they conform to HTTP and gRPC, returning a {@code Status}
213
   * on failure.
214
   *
215
   * @return status with description of failure, or {@code null} when valid
216
   */
217
  @Nullable
218
  private Status validateInitialMetadata(Metadata headers) {
219
    Integer httpStatus = headers.get(HTTP2_STATUS);
1✔
220
    if (httpStatus == null) {
1✔
221
      return Status.INTERNAL.withDescription("Missing HTTP status code");
1✔
222
    }
223
    String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY);
1✔
224
    if (!GrpcUtil.isGrpcContentType(contentType)) {
1✔
225
      return GrpcUtil.httpStatusToGrpcStatus(httpStatus)
1✔
226
          .augmentDescription("invalid content-type: " + contentType);
1✔
227
    }
228
    return null;
1✔
229
  }
230

231
  /**
232
   * Inspect the raw metadata and figure out what charset is being used.
233
   */
234
  private static Charset extractCharset(Metadata headers) {
235
    String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY);
1✔
236
    if (contentType != null) {
1✔
237
      String[] split = contentType.split("charset=", 2);
1✔
238
      try {
239
        return Charset.forName(split[split.length - 1].trim());
1✔
240
      } catch (Exception t) {
1✔
241
        // Ignore and assume UTF-8
242
      }
243
    }
244
    return Charsets.UTF_8;
1✔
245
  }
246

247
  /**
248
   * Strip HTTP transport implementation details so they don't leak via metadata into
249
   * the application layer.
250
   */
251
  private static void stripTransportDetails(Metadata metadata) {
252
    metadata.discardAll(HTTP2_STATUS);
1✔
253
    metadata.discardAll(InternalStatus.CODE_KEY);
1✔
254
    metadata.discardAll(InternalStatus.MESSAGE_KEY);
1✔
255
  }
1✔
256
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc