• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18809

24 Aug 2023 05:37PM CUT coverage: 88.286% (-0.02%) from 88.309%
#18809

push

github-actions

web-flow
examples: Android helloworld to pass Google lint (#10518)

These changes allow the Android helloworld example to pass the lint
checks of the Google internal build system.

30327 of 34351 relevant lines covered (88.29%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.37
/../servlet/src/main/java/io/grpc/servlet/ServletServerStream.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_GRPC;
20
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
21
import static java.lang.Math.max;
22
import static java.lang.Math.min;
23
import static java.util.logging.Level.FINE;
24
import static java.util.logging.Level.FINEST;
25
import static java.util.logging.Level.WARNING;
26

27
import com.google.common.io.BaseEncoding;
28
import com.google.common.util.concurrent.MoreExecutors;
29
import io.grpc.Attributes;
30
import io.grpc.InternalLogId;
31
import io.grpc.Metadata;
32
import io.grpc.Status;
33
import io.grpc.Status.Code;
34
import io.grpc.internal.AbstractServerStream;
35
import io.grpc.internal.GrpcUtil;
36
import io.grpc.internal.SerializingExecutor;
37
import io.grpc.internal.StatsTraceContext;
38
import io.grpc.internal.TransportFrameUtil;
39
import io.grpc.internal.TransportTracer;
40
import io.grpc.internal.WritableBuffer;
41
import java.io.IOException;
42
import java.nio.charset.StandardCharsets;
43
import java.util.Collections;
44
import java.util.HashMap;
45
import java.util.Map;
46
import java.util.concurrent.CountDownLatch;
47
import java.util.concurrent.TimeUnit;
48
import java.util.function.Supplier;
49
import java.util.logging.Logger;
50
import javax.annotation.Nullable;
51
import javax.servlet.AsyncContext;
52
import javax.servlet.WriteListener;
53
import javax.servlet.http.HttpServletResponse;
54

55
final class ServletServerStream extends AbstractServerStream {
56

57
  private static final Logger logger = Logger.getLogger(ServletServerStream.class.getName());
1✔
58

59
  private final ServletTransportState transportState;
60
  private final Sink sink = new Sink();
1✔
61
  private final AsyncContext asyncCtx;
62
  private final HttpServletResponse resp;
63
  private final Attributes attributes;
64
  private final String authority;
65
  private final InternalLogId logId;
66
  private final AsyncServletOutputStreamWriter writer;
67

68
  ServletServerStream(
69
      AsyncContext asyncCtx,
70
      StatsTraceContext statsTraceCtx,
71
      int maxInboundMessageSize,
72
      Attributes attributes,
73
      String authority,
74
      InternalLogId logId) throws IOException {
75
    super(ByteArrayWritableBuffer::new, statsTraceCtx);
1✔
76
    transportState =
1✔
77
        new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer());
78
    this.attributes = attributes;
1✔
79
    this.authority = authority;
1✔
80
    this.logId = logId;
1✔
81
    this.asyncCtx = asyncCtx;
1✔
82
    this.resp = (HttpServletResponse) asyncCtx.getResponse();
1✔
83
    this.writer = new AsyncServletOutputStreamWriter(
1✔
84
        asyncCtx, transportState, logId);
85
    resp.getOutputStream().setWriteListener(new GrpcWriteListener());
1✔
86
  }
1✔
87

88
  @Override
89
  protected ServletTransportState transportState() {
90
    return transportState;
1✔
91
  }
92

93
  @Override
94
  public Attributes getAttributes() {
95
    return attributes;
1✔
96
  }
97

98
  @Override
99
  public String getAuthority() {
100
    return authority;
1✔
101
  }
102

103
  @Override
104
  public int streamId() {
105
    return -1;
1✔
106
  }
107

108
  @Override
109
  protected Sink abstractServerStreamSink() {
110
    return sink;
1✔
111
  }
112

113
  private void writeHeadersToServletResponse(Metadata metadata) {
114
    // Discard any application supplied duplicates of the reserved headers
115
    metadata.discardAll(CONTENT_TYPE_KEY);
1✔
116
    metadata.discardAll(GrpcUtil.TE_HEADER);
1✔
117
    metadata.discardAll(GrpcUtil.USER_AGENT_KEY);
1✔
118

119
    if (logger.isLoggable(FINE)) {
1✔
120
      logger.log(FINE, "[{0}] writeHeaders {1}", new Object[] {logId, metadata});
×
121
    }
122

123
    resp.setStatus(HttpServletResponse.SC_OK);
1✔
124
    resp.setContentType(CONTENT_TYPE_GRPC);
1✔
125

126
    byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(metadata);
1✔
127
    for (int i = 0; i < serializedHeaders.length; i += 2) {
1✔
128
      resp.addHeader(
1✔
129
          new String(serializedHeaders[i], StandardCharsets.US_ASCII),
130
          new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII));
131
    }
132
  }
1✔
133

134
  final class ServletTransportState extends TransportState {
135

136
    private final SerializingExecutor transportThreadExecutor =
1✔
137
        new SerializingExecutor(MoreExecutors.directExecutor());
1✔
138

139
    private ServletTransportState(
140
        int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) {
1✔
141
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
142
    }
1✔
143

144
    @Override
145
    public void runOnTransportThread(Runnable r) {
146
      transportThreadExecutor.execute(r);
1✔
147
    }
1✔
148

149
    @Override
150
    public void bytesRead(int numBytes) {
151
      // no-op
152
      // no flow control yet
153
    }
1✔
154

155
    @Override
156
    public void deframeFailed(Throwable cause) {
157
      if (logger.isLoggable(FINE)) {
×
158
        logger.log(FINE, String.format("[{%s}] Exception processing message", logId), cause);
×
159
      }
160
      cancel(Status.fromThrowable(cause));
×
161
    }
×
162
  }
163

164
  private static final class ByteArrayWritableBuffer implements WritableBuffer {
165

166
    private final int capacity;
167
    final byte[] bytes;
168
    private int index;
169

170
    ByteArrayWritableBuffer(int capacityHint) {
1✔
171
      this.bytes = new byte[min(1024 * 1024,  max(4096, capacityHint))];
1✔
172
      this.capacity = bytes.length;
1✔
173
    }
1✔
174

175
    @Override
176
    public void write(byte[] src, int srcIndex, int length) {
177
      System.arraycopy(src, srcIndex, bytes, index, length);
1✔
178
      index += length;
1✔
179
    }
1✔
180

181
    @Override
182
    public void write(byte b) {
183
      bytes[index++] = b;
×
184
    }
×
185

186
    @Override
187
    public int writableBytes() {
188
      return capacity - index;
1✔
189
    }
190

191
    @Override
192
    public int readableBytes() {
193
      return index;
1✔
194
    }
195

196
    @Override
197
    public void release() {}
×
198
  }
199

200
  private final class GrpcWriteListener implements WriteListener {
1✔
201

202
    @Override
203
    public void onError(Throwable t) {
204
      if (logger.isLoggable(FINE)) {
×
205
        logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
×
206
      }
207

208
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
209
      // Else, the container will send RST_STREAM at the end.
210
      if (!resp.isCommitted()) {
×
211
        cancel(Status.fromThrowable(t));
×
212
      } else {
213
        transportState.runOnTransportThread(
×
214
            () -> transportState.transportReportStatus(Status.fromThrowable(t)));
×
215
      }
216
    }
×
217

218
    @Override
219
    public void onWritePossible() throws IOException {
220
      writer.onWritePossible();
1✔
221
    }
1✔
222
  }
223

224
  private final class Sink implements AbstractServerStream.Sink {
1✔
225
    final TrailerSupplier trailerSupplier = new TrailerSupplier();
1✔
226

227
    @Override
228
    public void writeHeaders(Metadata headers) {
229
      writeHeadersToServletResponse(headers);
1✔
230
      resp.setTrailerFields(trailerSupplier);
1✔
231
      try {
232
        writer.flush();
1✔
233
      } catch (IOException e) {
×
234
        logger.log(WARNING, String.format("[{%s}] Exception when flushBuffer", logId), e);
×
235
        cancel(Status.fromThrowable(e));
×
236
      }
1✔
237
    }
1✔
238

239
    @Override
240
    public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages) {
241
      if (frame == null && !flush) {
1✔
242
        return;
×
243
      }
244

245
      if (logger.isLoggable(FINEST)) {
1✔
246
        logger.log(
×
247
            FINEST,
248
            "[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}",
249
            new Object[]{logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages});
×
250
      }
251

252
      try {
253
        if (frame != null) {
1✔
254
          int numBytes = frame.readableBytes();
1✔
255
          if (numBytes > 0) {
1✔
256
            onSendingBytes(numBytes);
1✔
257
          }
258
          writer.writeBytes(((ByteArrayWritableBuffer) frame).bytes, frame.readableBytes());
1✔
259
        }
260

261
        if (flush) {
1✔
262
          writer.flush();
1✔
263
        }
264
      } catch (IOException e) {
×
265
        logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e);
×
266
        cancel(Status.fromThrowable(e));
×
267
      }
1✔
268
    }
1✔
269

270
    @Override
271
    public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
272
      if (logger.isLoggable(FINE)) {
1✔
273
        logger.log(
×
274
            FINE,
275
            "[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}",
276
            new Object[] {logId, trailers, headersSent, status});
×
277
      }
278
      if (!headersSent) {
1✔
279
        writeHeadersToServletResponse(trailers);
1✔
280
      } else {
281
        byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers);
1✔
282
        for (int i = 0; i < serializedHeaders.length; i += 2) {
1✔
283
          String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII);
1✔
284
          String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII);
1✔
285
          trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue);
1✔
286
          trailerSupplier.get().putIfAbsent(key, newValue);
1✔
287
        }
288
      }
289

290
      writer.complete();
1✔
291
    }
1✔
292

293
    @Override
294
    public void cancel(Status status) {
295
      if (resp.isCommitted() && Code.DEADLINE_EXCEEDED == status.getCode()) {
1✔
296
        return; // let the servlet timeout, the container will sent RST_STREAM automatically
1✔
297
      }
298
      transportState.runOnTransportThread(() -> transportState.transportReportStatus(status));
1✔
299
      // There is no way to RST_STREAM with CANCEL code, so write trailers instead
300
      close(Status.CANCELLED.withCause(status.asRuntimeException()), new Metadata());
1✔
301
      CountDownLatch countDownLatch = new CountDownLatch(1);
1✔
302
      transportState.runOnTransportThread(() -> {
1✔
303
        asyncCtx.complete();
1✔
304
        countDownLatch.countDown();
1✔
305
      });
1✔
306
      try {
307
        countDownLatch.await(5, TimeUnit.SECONDS);
1✔
308
      } catch (InterruptedException e) {
1✔
309
        Thread.currentThread().interrupt();
1✔
310
      }
1✔
311
    }
1✔
312
  }
313

314
  private static final class TrailerSupplier implements Supplier<Map<String, String>> {
315
    final Map<String, String> trailers = Collections.synchronizedMap(new HashMap<>());
1✔
316

317
    TrailerSupplier() {}
1✔
318

319
    @Override
320
    public Map<String, String> get() {
321
      return trailers;
1✔
322
    }
323
  }
324

325
  static String toHexString(byte[] bytes, int length) {
326
    String hex = BaseEncoding.base16().encode(bytes, 0, min(length, 64));
1✔
327
    if (length > 80) {
1✔
328
      hex += "...";
1✔
329
    }
330
    if (length > 64) {
1✔
331
      int offset = max(64, length - 16);
1✔
332
      hex += BaseEncoding.base16().encode(bytes, offset, length - offset);
1✔
333
    }
334
    return hex;
1✔
335
  }
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc