• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19962

28 Aug 2025 03:57AM UTC coverage: 88.537% (-0.02%) from 88.554%
#19962

push

github

web-flow
servlet: extract ServletServerStream.serializeHeaders() method (#12299)

34680 of 39170 relevant lines covered (88.54%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.83
/../servlet/src/main/java/io/grpc/servlet/ServletServerStream.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_GRPC;
20
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
21
import static java.lang.Math.max;
22
import static java.lang.Math.min;
23
import static java.util.logging.Level.FINE;
24
import static java.util.logging.Level.FINEST;
25
import static java.util.logging.Level.WARNING;
26

27
import com.google.common.io.BaseEncoding;
28
import com.google.common.util.concurrent.MoreExecutors;
29
import io.grpc.Attributes;
30
import io.grpc.InternalLogId;
31
import io.grpc.Metadata;
32
import io.grpc.Status;
33
import io.grpc.internal.AbstractServerStream;
34
import io.grpc.internal.GrpcUtil;
35
import io.grpc.internal.SerializingExecutor;
36
import io.grpc.internal.StatsTraceContext;
37
import io.grpc.internal.TransportFrameUtil;
38
import io.grpc.internal.TransportTracer;
39
import io.grpc.internal.WritableBuffer;
40
import java.io.IOException;
41
import java.nio.charset.StandardCharsets;
42
import java.util.Collections;
43
import java.util.HashMap;
44
import java.util.Map;
45
import java.util.function.BiConsumer;
46
import java.util.function.Supplier;
47
import java.util.logging.Logger;
48
import javax.annotation.Nullable;
49
import javax.servlet.AsyncContext;
50
import javax.servlet.WriteListener;
51
import javax.servlet.http.HttpServletResponse;
52

53
final class ServletServerStream extends AbstractServerStream {
54

55
  private static final Logger logger = Logger.getLogger(ServletServerStream.class.getName());
1✔
56

57
  private final ServletTransportState transportState;
58
  private final Sink sink = new Sink();
1✔
59
  private final HttpServletResponse resp;
60
  private final Attributes attributes;
61
  private final String authority;
62
  private final InternalLogId logId;
63
  private final AsyncServletOutputStreamWriter writer;
64
  /**
65
   * If the async servlet operation has been completed.
66
   */
67
  volatile boolean asyncCompleted = false;
1✔
68

69
  ServletServerStream(
70
      AsyncContext asyncCtx,
71
      StatsTraceContext statsTraceCtx,
72
      int maxInboundMessageSize,
73
      Attributes attributes,
74
      String authority,
75
      InternalLogId logId) throws IOException {
76
    super(ByteArrayWritableBuffer::new, statsTraceCtx);
1✔
77
    transportState =
1✔
78
        new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer());
79
    this.attributes = attributes;
1✔
80
    this.authority = authority;
1✔
81
    this.logId = logId;
1✔
82
    this.resp = (HttpServletResponse) asyncCtx.getResponse();
1✔
83
    this.writer = new AsyncServletOutputStreamWriter(
1✔
84
        asyncCtx, transportState, logId);
85
    resp.getOutputStream().setWriteListener(new GrpcWriteListener());
1✔
86
  }
1✔
87

88
  @Override
89
  protected ServletTransportState transportState() {
90
    return transportState;
1✔
91
  }
92

93
  @Override
94
  public Attributes getAttributes() {
95
    return attributes;
1✔
96
  }
97

98
  @Override
99
  public String getAuthority() {
100
    return authority;
1✔
101
  }
102

103
  @Override
104
  public int streamId() {
105
    return -1;
1✔
106
  }
107

108
  @Override
109
  protected Sink abstractServerStreamSink() {
110
    return sink;
1✔
111
  }
112

113
  private void writeHeadersToServletResponse(Metadata metadata) {
114
    // Discard any application supplied duplicates of the reserved headers
115
    metadata.discardAll(CONTENT_TYPE_KEY);
1✔
116
    metadata.discardAll(GrpcUtil.TE_HEADER);
1✔
117
    metadata.discardAll(GrpcUtil.USER_AGENT_KEY);
1✔
118

119
    if (logger.isLoggable(FINE)) {
1✔
120
      logger.log(FINE, "[{0}] writeHeaders {1}", new Object[] {logId, metadata});
×
121
    }
122

123
    resp.setStatus(HttpServletResponse.SC_OK);
1✔
124
    resp.setContentType(CONTENT_TYPE_GRPC);
1✔
125

126
    serializeHeaders(metadata, resp::addHeader);
1✔
127
  }
1✔
128

129
  private static void serializeHeaders(Metadata metadata, BiConsumer<String, String> consumer) {
130
    byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(metadata);
1✔
131
    for (int i = 0; i < serializedHeaders.length; i += 2) {
1✔
132
      consumer.accept(
1✔
133
          new String(serializedHeaders[i], StandardCharsets.US_ASCII),
134
          new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII));
135
    }
136
  }
1✔
137

138
  final class ServletTransportState extends TransportState {
139

140
    private final SerializingExecutor transportThreadExecutor =
1✔
141
        new SerializingExecutor(MoreExecutors.directExecutor());
1✔
142

143
    private ServletTransportState(
144
        int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) {
1✔
145
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
146
    }
1✔
147

148
    @Override
149
    public void runOnTransportThread(Runnable r) {
150
      transportThreadExecutor.execute(r);
1✔
151
    }
1✔
152

153
    @Override
154
    public void bytesRead(int numBytes) {
155
      // no-op
156
      // no flow control yet
157
    }
1✔
158

159
    @Override
160
    public void deframeFailed(Throwable cause) {
161
      if (logger.isLoggable(WARNING)) {
×
162
        logger.log(WARNING, String.format("[{%s}] Exception processing message", logId), cause);
×
163
      }
164
      cancel(Status.fromThrowable(cause));
×
165
    }
×
166
  }
167

168
  private static final class ByteArrayWritableBuffer implements WritableBuffer {
169

170
    private final int capacity;
171
    final byte[] bytes;
172
    private int index;
173

174
    ByteArrayWritableBuffer(int capacityHint) {
1✔
175
      this.bytes = new byte[min(1024 * 1024, capacityHint)];
1✔
176
      this.capacity = bytes.length;
1✔
177
    }
1✔
178

179
    @Override
180
    public void write(byte[] src, int srcIndex, int length) {
181
      System.arraycopy(src, srcIndex, bytes, index, length);
1✔
182
      index += length;
1✔
183
    }
1✔
184

185
    @Override
186
    public void write(byte b) {
187
      bytes[index++] = b;
×
188
    }
×
189

190
    @Override
191
    public int writableBytes() {
192
      return capacity - index;
1✔
193
    }
194

195
    @Override
196
    public int readableBytes() {
197
      return index;
1✔
198
    }
199

200
    @Override
201
    public void release() {}
×
202
  }
203

204
  private final class GrpcWriteListener implements WriteListener {
1✔
205

206
    @Override
207
    public void onError(Throwable t) {
208
      if (logger.isLoggable(FINE)) {
×
209
        logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
×
210
      }
211

212
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
213
      // Else, the container will send RST_STREAM at the end.
214
      if (!resp.isCommitted()) {
×
215
        cancel(Status.fromThrowable(t));
×
216
      } else {
217
        transportState.runOnTransportThread(
×
218
            () -> transportState.transportReportStatus(Status.fromThrowable(t)));
×
219
      }
220
    }
×
221

222
    @Override
223
    public void onWritePossible() throws IOException {
224
      writer.onWritePossible();
1✔
225
    }
1✔
226
  }
227

228
  private final class Sink implements AbstractServerStream.Sink {
1✔
229
    final TrailerSupplier trailerSupplier = new TrailerSupplier();
1✔
230

231
    @Override
232
    public void writeHeaders(Metadata headers, boolean flush) {
233
      writeHeadersToServletResponse(headers);
1✔
234
      resp.setTrailerFields(trailerSupplier);
1✔
235
      try {
236
        writer.flush();
1✔
237
      } catch (IOException e) {
×
238
        logger.log(WARNING, String.format("[{%s}] Exception when flushBuffer", logId), e);
×
239
        cancel(Status.fromThrowable(e));
×
240
      }
1✔
241
    }
1✔
242

243
    @Override
244
    public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages) {
245
      if (frame == null && !flush) {
1✔
246
        return;
×
247
      }
248

249
      if (logger.isLoggable(FINEST)) {
1✔
250
        logger.log(
×
251
            FINEST,
252
            "[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}",
253
            new Object[]{logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages});
×
254
      }
255

256
      try {
257
        if (frame != null) {
1✔
258
          int numBytes = frame.readableBytes();
1✔
259
          if (numBytes > 0) {
1✔
260
            onSendingBytes(numBytes);
1✔
261
          }
262
          writer.writeBytes(((ByteArrayWritableBuffer) frame).bytes, frame.readableBytes());
1✔
263
        }
264

265
        if (flush) {
1✔
266
          writer.flush();
1✔
267
        }
268
      } catch (IOException e) {
×
269
        logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e);
×
270
        cancel(Status.fromThrowable(e));
×
271
      }
1✔
272
    }
1✔
273

274
    @Override
275
    public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
276
      if (logger.isLoggable(FINE)) {
1✔
277
        logger.log(
×
278
            FINE,
279
            "[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}",
280
            new Object[] {logId, trailers, headersSent, status});
×
281
      }
282
      if (!headersSent) {
1✔
283
        writeHeadersToServletResponse(trailers);
1✔
284
      } else {
285
        serializeHeaders(trailers,
1✔
286
            (k, v) -> trailerSupplier.get().merge(k, v, (oldV, newV) -> oldV + "," + newV));
1✔
287
      }
288

289
      writer.complete();
1✔
290
    }
1✔
291

292
    @Override
293
    public void cancel(Status status) {
294
      transportState.runOnTransportThread(() -> transportState.transportReportStatus(status));
1✔
295
      if (asyncCompleted) {
1✔
296
        logger.fine("ignore cancel as already completed");
×
297
        return;
×
298
      }
299
      // There is no way to RST_STREAM with CANCEL code, so write trailers instead
300
      close(status, new Metadata());
1✔
301
      // close() calls writeTrailers(), which calls AsyncContext.complete()
302
    }
1✔
303
  }
304

305
  private static final class TrailerSupplier implements Supplier<Map<String, String>> {
306
    final Map<String, String> trailers = Collections.synchronizedMap(new HashMap<>());
1✔
307

308
    TrailerSupplier() {}
1✔
309

310
    @Override
311
    public Map<String, String> get() {
312
      return trailers;
1✔
313
    }
314
  }
315

316
  static String toHexString(byte[] bytes, int length) {
317
    String hex = BaseEncoding.base16().encode(bytes, 0, min(length, 64));
×
318
    if (length > 80) {
×
319
      hex += "...";
×
320
    }
321
    if (length > 64) {
×
322
      int offset = max(64, length - 16);
×
323
      hex += BaseEncoding.base16().encode(bytes, offset, length - offset);
×
324
    }
325
    return hex;
×
326
  }
327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc