• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20123

22 Dec 2025 07:27PM UTC coverage: 88.719% (-0.03%) from 88.747%
#20123

push

github

web-flow
core: Delete ReadableBuffer.readBytes(ByteBuffer) (#12580)

At the very least it isn't used now. The method is as old as
ReadableBuffer itself (05a2b252b), but it appears to have never actually
been used.

35445 of 39952 relevant lines covered (88.72%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.14
/../servlet/src/main/java/io/grpc/servlet/ServletServerStream.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_GRPC;
20
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
21
import static java.lang.Math.max;
22
import static java.lang.Math.min;
23
import static java.util.logging.Level.FINE;
24
import static java.util.logging.Level.FINEST;
25
import static java.util.logging.Level.WARNING;
26

27
import com.google.common.io.BaseEncoding;
28
import com.google.common.util.concurrent.MoreExecutors;
29
import io.grpc.Attributes;
30
import io.grpc.InternalLogId;
31
import io.grpc.Metadata;
32
import io.grpc.Status;
33
import io.grpc.internal.AbstractServerStream;
34
import io.grpc.internal.GrpcUtil;
35
import io.grpc.internal.SerializingExecutor;
36
import io.grpc.internal.StatsTraceContext;
37
import io.grpc.internal.TransportFrameUtil;
38
import io.grpc.internal.TransportTracer;
39
import io.grpc.internal.WritableBuffer;
40
import java.io.IOException;
41
import java.nio.charset.StandardCharsets;
42
import java.util.Collections;
43
import java.util.HashMap;
44
import java.util.Map;
45
import java.util.function.BiConsumer;
46
import java.util.function.Supplier;
47
import java.util.logging.Logger;
48
import javax.annotation.Nullable;
49
import javax.servlet.AsyncContext;
50
import javax.servlet.WriteListener;
51
import javax.servlet.http.HttpServletResponse;
52

53
final class ServletServerStream extends AbstractServerStream {
54

55
  private static final Logger logger = Logger.getLogger(ServletServerStream.class.getName());
1✔
56

57
  private final ServletTransportState transportState;
58
  private final Sink sink = new Sink();
1✔
59
  private final HttpServletResponse resp;
60
  private final Attributes attributes;
61
  private final String authority;
62
  private final InternalLogId logId;
63
  private final AsyncServletOutputStreamWriter writer;
64
  /**
65
   * If the async servlet operation has been completed.
66
   */
67
  volatile boolean asyncCompleted = false;
1✔
68

69
  ServletServerStream(
70
      AsyncContext asyncCtx,
71
      StatsTraceContext statsTraceCtx,
72
      int maxInboundMessageSize,
73
      Attributes attributes,
74
      String authority,
75
      InternalLogId logId) throws IOException {
76
    super(ByteArrayWritableBuffer::new, statsTraceCtx);
1✔
77
    transportState =
1✔
78
        new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer());
79
    this.attributes = attributes;
1✔
80
    this.authority = authority;
1✔
81
    this.logId = logId;
1✔
82
    this.resp = (HttpServletResponse) asyncCtx.getResponse();
1✔
83
    this.writer = new AsyncServletOutputStreamWriter(
1✔
84
        asyncCtx, transportState, logId);
85
    resp.getOutputStream().setWriteListener(new GrpcWriteListener());
1✔
86
  }
1✔
87

88
  @Override
89
  protected ServletTransportState transportState() {
90
    return transportState;
1✔
91
  }
92

93
  @Override
94
  public Attributes getAttributes() {
95
    return attributes;
1✔
96
  }
97

98
  @Override
99
  public String getAuthority() {
100
    return authority;
1✔
101
  }
102

103
  @Override
104
  public int streamId() {
105
    return -1;
1✔
106
  }
107

108
  @Override
109
  protected Sink abstractServerStreamSink() {
110
    return sink;
1✔
111
  }
112

113
  private void writeHeadersToServletResponse(Metadata metadata) {
114
    // Discard any application supplied duplicates of the reserved headers
115
    metadata.discardAll(CONTENT_TYPE_KEY);
1✔
116
    metadata.discardAll(GrpcUtil.TE_HEADER);
1✔
117
    metadata.discardAll(GrpcUtil.USER_AGENT_KEY);
1✔
118

119
    if (logger.isLoggable(FINE)) {
1✔
120
      logger.log(FINE, "[{0}] writeHeaders {1}", new Object[] {logId, metadata});
×
121
    }
122

123
    resp.setStatus(HttpServletResponse.SC_OK);
1✔
124
    resp.setContentType(CONTENT_TYPE_GRPC);
1✔
125

126
    serializeHeaders(metadata, resp::addHeader);
1✔
127
  }
1✔
128

129
  private static void serializeHeaders(Metadata metadata, BiConsumer<String, String> consumer) {
130
    byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(metadata);
1✔
131
    for (int i = 0; i < serializedHeaders.length; i += 2) {
1✔
132
      consumer.accept(
1✔
133
          new String(serializedHeaders[i], StandardCharsets.US_ASCII),
134
          new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII));
135
    }
136
  }
1✔
137

138
  final class ServletTransportState extends TransportState {
139

140
    private final SerializingExecutor transportThreadExecutor =
1✔
141
        new SerializingExecutor(MoreExecutors.directExecutor());
1✔
142

143
    private ServletTransportState(
144
        int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) {
1✔
145
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
146
    }
1✔
147

148
    @Override
149
    public void runOnTransportThread(Runnable r) {
150
      transportThreadExecutor.execute(r);
1✔
151
    }
1✔
152

153
    @Override
154
    public void bytesRead(int numBytes) {
155
      // no-op
156
      // no flow control yet
157
    }
1✔
158

159
    @Override
160
    public void deframeFailed(Throwable cause) {
161
      if (logger.isLoggable(WARNING)) {
×
162
        logger.log(WARNING, String.format("[{%s}] Exception processing message", logId), cause);
×
163
      }
164
      cancel(Status.fromThrowable(cause));
×
165
    }
×
166
  }
167

168
  private static final class ByteArrayWritableBuffer implements WritableBuffer {
169

170
    private final int capacity;
171
    final byte[] bytes;
172
    private int index;
173

174
    ByteArrayWritableBuffer(int capacityHint) {
1✔
175
      this.bytes = new byte[min(1024 * 1024, capacityHint)];
1✔
176
      this.capacity = bytes.length;
1✔
177
    }
1✔
178

179
    @Override
180
    public void write(byte[] src, int srcIndex, int length) {
181
      System.arraycopy(src, srcIndex, bytes, index, length);
1✔
182
      index += length;
1✔
183
    }
1✔
184

185
    @Override
186
    public void write(byte b) {
187
      bytes[index++] = b;
×
188
    }
×
189

190
    @Override
191
    public int writableBytes() {
192
      return capacity - index;
1✔
193
    }
194

195
    @Override
196
    public int readableBytes() {
197
      return index;
1✔
198
    }
199

200
    @Override
201
    public void release() {}
×
202
  }
203

204
  private final class GrpcWriteListener implements WriteListener {
1✔
205

206
    @Override
207
    public void onError(Throwable t) {
208
      if (logger.isLoggable(FINE)) {
1✔
209
        logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
×
210
      }
211

212
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
213
      // Else, the container will send RST_STREAM at the end.
214
      if (!resp.isCommitted()) {
1✔
215
        cancel(Status.fromThrowable(t));
×
216
      } else {
217
        transportState.runOnTransportThread(
1✔
218
            () -> transportState.transportReportStatus(Status.fromThrowable(t)));
1✔
219
      }
220
    }
1✔
221

222
    @Override
223
    public void onWritePossible() throws IOException {
224
      writer.onWritePossible();
1✔
225
    }
1✔
226
  }
227

228
  private final class Sink implements AbstractServerStream.Sink {
1✔
229
    final TrailerSupplier trailerSupplier = new TrailerSupplier();
1✔
230

231
    @Override
232
    public void writeHeaders(Metadata headers, boolean flush) {
233
      writeHeadersToServletResponse(headers);
1✔
234
      resp.setTrailerFields(trailerSupplier);
1✔
235
      try {
236
        writer.flush();
1✔
237
      } catch (IOException e) {
×
238
        logger.log(WARNING, String.format("[{%s}] Exception when flushBuffer", logId), e);
×
239
        cancel(Status.fromThrowable(e));
×
240
      }
1✔
241
    }
1✔
242

243
    @Override
244
    public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages) {
245
      if (frame == null && !flush) {
1✔
246
        return;
×
247
      }
248

249
      if (logger.isLoggable(FINEST)) {
1✔
250
        logger.log(
×
251
            FINEST,
252
            "[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}",
253
            new Object[]{logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages});
×
254
      }
255

256
      try {
257
        if (frame != null) {
1✔
258
          int numBytes = frame.readableBytes();
1✔
259
          if (numBytes > 0) {
1✔
260
            onSendingBytes(numBytes);
1✔
261
          }
262
          writer.writeBytes(((ByteArrayWritableBuffer) frame).bytes, frame.readableBytes());
1✔
263
        }
264

265
        if (flush) {
1✔
266
          writer.flush();
1✔
267
        }
268
      } catch (IOException e) {
×
269
        logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e);
×
270
        cancel(Status.fromThrowable(e));
×
271
      }
1✔
272
    }
1✔
273

274
    @Override
275
    public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
276
      if (logger.isLoggable(FINE)) {
1✔
277
        logger.log(
×
278
            FINE,
279
            "[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}",
280
            new Object[] {logId, trailers, headersSent, status});
×
281
      }
282
      if (!headersSent) {
1✔
283
        writeHeadersToServletResponse(trailers);
1✔
284
      } else {
285
        serializeHeaders(trailers,
1✔
286
            (k, v) -> trailerSupplier.get().merge(k, v, (oldV, newV) -> oldV + "," + newV));
1✔
287
      }
288

289
      writer.complete();
1✔
290
    }
1✔
291

292
    @Override
293
    public void cancel(Status status) {
294
      transportState.runOnTransportThread(() -> transportState.transportReportStatus(status));
1✔
295
      if (asyncCompleted) {
1✔
296
        logger.fine("ignore cancel as already completed");
×
297
        return;
×
298
      }
299
      // There is no way to RST_STREAM with CANCEL code, so write trailers instead
300
      close(status, new Metadata());
1✔
301
      // close() calls writeTrailers(), which calls AsyncContext.complete()
302
    }
1✔
303
  }
304

305
  private static final class TrailerSupplier implements Supplier<Map<String, String>> {
306
    final Map<String, String> trailers = Collections.synchronizedMap(new HashMap<>());
1✔
307

308
    TrailerSupplier() {}
1✔
309

310
    @Override
311
    public Map<String, String> get() {
312
      return trailers;
1✔
313
    }
314
  }
315

316
  static String toHexString(byte[] bytes, int length) {
317
    String hex = BaseEncoding.base16().encode(bytes, 0, min(length, 64));
×
318
    if (length > 80) {
×
319
      hex += "...";
×
320
    }
321
    if (length > 64) {
×
322
      int offset = max(64, length - 16);
×
323
      hex += BaseEncoding.base16().encode(bytes, offset, length - offset);
×
324
    }
325
    return hex;
×
326
  }
327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc