• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19677

05 Feb 2025 09:05AM CUT coverage: 88.604% (+0.03%) from 88.578%
#19677

Pull #11858

github

web-flow
Merge dad68ffd5 into ea3f644ee
Pull Request #11858: core: updates the backoff range as per the A6 redefinition

33773 of 38117 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.67
/../servlet/src/main/java/io/grpc/servlet/ServletAdapter.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
22
import static java.util.logging.Level.FINE;
23
import static java.util.logging.Level.FINEST;
24

25
import com.google.common.io.BaseEncoding;
26
import io.grpc.Attributes;
27
import io.grpc.ExperimentalApi;
28
import io.grpc.Grpc;
29
import io.grpc.InternalLogId;
30
import io.grpc.InternalMetadata;
31
import io.grpc.Metadata;
32
import io.grpc.ServerStreamTracer;
33
import io.grpc.Status;
34
import io.grpc.internal.GrpcUtil;
35
import io.grpc.internal.ReadableBuffers;
36
import io.grpc.internal.ServerTransportListener;
37
import io.grpc.internal.StatsTraceContext;
38
import java.io.IOException;
39
import java.net.InetSocketAddress;
40
import java.net.URI;
41
import java.net.URISyntaxException;
42
import java.nio.charset.StandardCharsets;
43
import java.util.ArrayList;
44
import java.util.Arrays;
45
import java.util.Enumeration;
46
import java.util.List;
47
import java.util.concurrent.TimeUnit;
48
import java.util.logging.Logger;
49
import javax.servlet.AsyncContext;
50
import javax.servlet.AsyncEvent;
51
import javax.servlet.AsyncListener;
52
import javax.servlet.ReadListener;
53
import javax.servlet.ServletInputStream;
54
import javax.servlet.http.HttpServletRequest;
55
import javax.servlet.http.HttpServletResponse;
56

57
/**
58
 * An adapter that transforms {@link HttpServletRequest} into gRPC request and lets a gRPC server
59
 * process it, and transforms the gRPC response into {@link HttpServletResponse}. An adapter can be
60
 * instantiated by {@link ServletServerBuilder#buildServletAdapter()}.
61
 *
62
 * <p>In a servlet, calling {@link #doPost(HttpServletRequest, HttpServletResponse)} inside {@link
63
 * javax.servlet.http.HttpServlet#doPost(HttpServletRequest, HttpServletResponse)} makes the servlet
64
 * backed by the gRPC server associated with the adapter. The servlet must support Asynchronous
65
 * Processing and must be deployed to a container that supports servlet 4.0 and enables HTTP/2.
66
 *
67
 * <p>The API is experimental. The authors would like to know more about the real usecases. Users
68
 * are welcome to provide feedback by commenting on
69
 * <a href=https://github.com/grpc/grpc-java/issues/5066>the tracking issue</a>.
70
 */
71
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066")
72
public final class ServletAdapter {
73

74
  static final Logger logger = Logger.getLogger(ServletAdapter.class.getName());
1✔
75

76
  private final ServerTransportListener transportListener;
77
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
78
  private final int maxInboundMessageSize;
79
  private final Attributes attributes;
80

81
  ServletAdapter(
82
      ServerTransportListener transportListener,
83
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
84
      int maxInboundMessageSize) {
1✔
85
    this.transportListener = transportListener;
1✔
86
    this.streamTracerFactories = streamTracerFactories;
1✔
87
    this.maxInboundMessageSize = maxInboundMessageSize;
1✔
88
    attributes = transportListener.transportReady(Attributes.EMPTY);
1✔
89
  }
1✔
90

91
  /**
92
   * Call this method inside {@link javax.servlet.http.HttpServlet#doGet(HttpServletRequest,
93
   * HttpServletResponse)} to serve gRPC GET request.
94
   *
95
   * <p>This method is currently not implemented.
96
   *
97
   * <p>Note that in rare case gRPC client sends GET requests.
98
   *
99
   * <p>Do not modify {@code req} and {@code resp} before or after calling this method. However,
100
   * calling {@code resp.setBufferSize()} before invocation is allowed.
101
   */
102
  public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
103
    resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, "GET method not supported");
×
104
  }
×
105

106
  /**
107
   * Call this method inside {@link javax.servlet.http.HttpServlet#doPost(HttpServletRequest,
108
   * HttpServletResponse)} to serve gRPC POST request.
109
   *
110
   * <p>Do not modify {@code req} and {@code resp} before or after calling this method. However,
111
   * calling {@code resp.setBufferSize()} before invocation is allowed.
112
   */
113
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
114
    checkArgument(req.isAsyncSupported(), "servlet does not support asynchronous operation");
1✔
115
    checkArgument(ServletAdapter.isGrpc(req), "the request is not a gRPC request");
1✔
116

117
    InternalLogId logId = InternalLogId.allocate(ServletAdapter.class, null);
1✔
118
    logger.log(FINE, "[{0}] RPC started", logId);
1✔
119

120
    AsyncContext asyncCtx = req.startAsync(req, resp);
1✔
121

122
    String method = req.getRequestURI().substring(1); // remove the leading "/"
1✔
123
    Metadata headers = getHeaders(req);
1✔
124

125
    if (logger.isLoggable(FINEST)) {
1✔
126
      logger.log(FINEST, "[{0}] method: {1}", new Object[] {logId, method});
×
127
      logger.log(FINEST, "[{0}] headers: {1}", new Object[] {logId, headers});
×
128
    }
129

130
    Long timeoutNanos = headers.get(TIMEOUT_KEY);
1✔
131
    if (timeoutNanos == null) {
1✔
132
      timeoutNanos = 0L;
1✔
133
    }
134
    asyncCtx.setTimeout(TimeUnit.NANOSECONDS.toMillis(timeoutNanos));
1✔
135
    StatsTraceContext statsTraceCtx =
1✔
136
        StatsTraceContext.newServerContext(streamTracerFactories, method, headers);
1✔
137

138
    ServletServerStream stream = new ServletServerStream(
1✔
139
        asyncCtx,
140
        statsTraceCtx,
141
        maxInboundMessageSize,
142
        attributes.toBuilder()
1✔
143
            .set(
1✔
144
                Grpc.TRANSPORT_ATTR_REMOTE_ADDR,
145
                new InetSocketAddress(req.getRemoteHost(), req.getRemotePort()))
1✔
146
            .set(
1✔
147
                Grpc.TRANSPORT_ATTR_LOCAL_ADDR,
148
                new InetSocketAddress(req.getLocalAddr(), req.getLocalPort()))
1✔
149
            .build(),
1✔
150
        getAuthority(req),
1✔
151
        logId);
152

153
    transportListener.streamCreated(stream, method, headers);
1✔
154
    stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated);
1✔
155

156
    asyncCtx.getRequest().getInputStream()
1✔
157
        .setReadListener(new GrpcReadListener(stream, asyncCtx, logId));
1✔
158
    asyncCtx.addListener(new GrpcAsyncListener(stream, logId));
1✔
159
  }
1✔
160

161
  // This method must use Enumeration and its members, since that is the only way to read headers
162
  // from the servlet api.
163
  @SuppressWarnings("JdkObsolete")
164
  private static Metadata getHeaders(HttpServletRequest req) {
165
    Enumeration<String> headerNames = req.getHeaderNames();
1✔
166
    checkNotNull(
1✔
167
        headerNames, "Servlet container does not allow HttpServletRequest.getHeaderNames()");
168
    List<byte[]> byteArrays = new ArrayList<>();
1✔
169
    while (headerNames.hasMoreElements()) {
1✔
170
      String headerName = headerNames.nextElement();
1✔
171
      Enumeration<String> values = req.getHeaders(headerName);
1✔
172
      if (values == null) {
1✔
173
        continue;
×
174
      }
175
      while (values.hasMoreElements()) {
1✔
176
        String value = values.nextElement();
1✔
177
        if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
178
          byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII));
1✔
179
          byteArrays.add(BaseEncoding.base64().decode(value));
1✔
180
        } else {
181
          byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII));
1✔
182
          byteArrays.add(value.getBytes(StandardCharsets.US_ASCII));
1✔
183
        }
184
      }
1✔
185
    }
1✔
186
    return InternalMetadata.newMetadata(byteArrays.toArray(new byte[][]{}));
1✔
187
  }
188

189
  // This method must use HttpRequest#getRequestURL or HttpUtils#getRequestURL, both of which
190
  // can only return StringBuffer instances
191
  @SuppressWarnings("JdkObsolete")
192
  private static String getAuthority(HttpServletRequest req) {
193
    try {
194
      return new URI(req.getRequestURL().toString()).getAuthority();
1✔
195
    } catch (URISyntaxException e) {
×
196
      logger.log(FINE, "Error getting authority from the request URL {0}", req.getRequestURL());
×
197
      return req.getServerName() + ":" + req.getServerPort();
×
198
    }
199
  }
200

201
  /**
202
   * Call this method when the adapter is no longer needed. The gRPC server will be terminated.
203
   */
204
  public void destroy() {
205
    transportListener.transportTerminated();
1✔
206
  }
1✔
207

208
  private static final class GrpcAsyncListener implements AsyncListener {
209
    final InternalLogId logId;
210
    final ServletServerStream stream;
211

212
    GrpcAsyncListener(ServletServerStream stream, InternalLogId logId) {
1✔
213
      this.stream = stream;
1✔
214
      this.logId = logId;
1✔
215
    }
1✔
216

217
    @Override
218
    public void onComplete(AsyncEvent event) {}
1✔
219

220
    @Override
221
    public void onTimeout(AsyncEvent event) {
222
      if (logger.isLoggable(FINE)) {
1✔
223
        logger.log(FINE, String.format("[{%s}] Timeout: ", logId), event.getThrowable());
×
224
      }
225
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
226
      // Else, the container will send RST_STREAM in the end.
227
      if (!event.getAsyncContext().getResponse().isCommitted()) {
1✔
228
        stream.cancel(Status.DEADLINE_EXCEEDED);
1✔
229
      } else {
230
        stream.transportState().runOnTransportThread(
×
231
            () -> stream.transportState().transportReportStatus(Status.DEADLINE_EXCEEDED));
×
232
      }
233
    }
1✔
234

235
    @Override
236
    public void onError(AsyncEvent event) {
237
      if (logger.isLoggable(FINE)) {
1✔
238
        logger.log(FINE, String.format("[{%s}] Error: ", logId), event.getThrowable());
×
239
      }
240

241
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
242
      // Else, the container will send RST_STREAM at the end.
243
      if (!event.getAsyncContext().getResponse().isCommitted()) {
1✔
244
        stream.cancel(Status.fromThrowable(event.getThrowable()));
×
245
      } else {
246
        stream.transportState().runOnTransportThread(
1✔
247
            () -> stream.transportState().transportReportStatus(
1✔
248
                Status.fromThrowable(event.getThrowable())));
1✔
249
      }
250
    }
1✔
251

252
    @Override
253
    public void onStartAsync(AsyncEvent event) {}
×
254
  }
255

256
  private static final class GrpcReadListener implements ReadListener {
257
    final ServletServerStream stream;
258
    final AsyncContext asyncCtx;
259
    final ServletInputStream input;
260
    final InternalLogId logId;
261

262
    GrpcReadListener(
263
        ServletServerStream stream,
264
        AsyncContext asyncCtx,
265
        InternalLogId logId) throws IOException {
1✔
266
      this.stream = stream;
1✔
267
      this.asyncCtx = asyncCtx;
1✔
268
      input = asyncCtx.getRequest().getInputStream();
1✔
269
      this.logId = logId;
1✔
270
    }
1✔
271

272
    final byte[] buffer = new byte[4 * 1024];
1✔
273

274
    @Override
275
    public void onDataAvailable() throws IOException {
276
      logger.log(FINEST, "[{0}] onDataAvailable: ENTRY", logId);
1✔
277

278
      while (input.isReady()) {
1✔
279
        int length = input.read(buffer);
1✔
280
        if (length == -1) {
1✔
281
          logger.log(FINEST, "[{0}] inbound data: read end of stream", logId);
×
282
          return;
×
283
        } else {
284
          if (logger.isLoggable(FINEST)) {
1✔
285
            logger.log(
×
286
                FINEST,
287
                "[{0}] inbound data: length = {1}, bytes = {2}",
288
                new Object[] {logId, length, ServletServerStream.toHexString(buffer, length)});
×
289
          }
290

291
          byte[] copy = Arrays.copyOf(buffer, length);
1✔
292
          stream.transportState().runOnTransportThread(
1✔
293
              () -> stream.transportState().inboundDataReceived(ReadableBuffers.wrap(copy), false));
1✔
294
        }
295
      }
1✔
296

297
      logger.log(FINEST, "[{0}] onDataAvailable: EXIT", logId);
1✔
298
    }
1✔
299

300
    @Override
301
    public void onAllDataRead() {
302
      logger.log(FINE, "[{0}] onAllDataRead", logId);
1✔
303
      stream.transportState().runOnTransportThread(() ->
1✔
304
          stream.transportState().inboundDataReceived(ReadableBuffers.empty(), true));
1✔
305
    }
1✔
306

307
    @Override
308
    public void onError(Throwable t) {
309
      if (logger.isLoggable(FINE)) {
1✔
310
        logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
×
311
      }
312
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
313
      // Else, the container will send RST_STREAM at the end.
314
      if (!asyncCtx.getResponse().isCommitted()) {
1✔
315
        stream.cancel(Status.fromThrowable(t));
1✔
316
      } else {
317
        stream.transportState().runOnTransportThread(
×
318
            () -> stream.transportState()
×
319
                .transportReportStatus(Status.fromThrowable(t)));
×
320
      }
321
    }
1✔
322
  }
323

324
  /**
325
   * Checks whether an incoming {@code HttpServletRequest} may come from a gRPC client.
326
   *
327
   * @return true if the request comes from a gRPC client
328
   */
329
  public static boolean isGrpc(HttpServletRequest request) {
330
    return request.getContentType() != null
1✔
331
        && request.getContentType().contains(GrpcUtil.CONTENT_TYPE_GRPC);
1✔
332
  }
333
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc