• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19674

31 Jan 2025 12:10AM CUT coverage: 88.56% (-0.03%) from 88.586%
#19674

push

github

ejona86
alts: Add ClientCall support to AltsContextUtil

This adds a createFrom(Attributes) to mirror the check(Attributes) added
in ba8ab79. It also adds conveniences for ClientCall for both
createFrom() and check(). This allows getting peer information from
ClientCall and CallCredentials.RequestInfo, as was already available
from ServerCall.

The tests were reworked to test the Attribute-based methods and then
only basic tests for client/server.

Fixes #11042

33752 of 38112 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.67
/../servlet/src/main/java/io/grpc/servlet/ServletAdapter.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
22
import static java.util.logging.Level.FINE;
23
import static java.util.logging.Level.FINEST;
24

25
import com.google.common.io.BaseEncoding;
26
import io.grpc.Attributes;
27
import io.grpc.ExperimentalApi;
28
import io.grpc.Grpc;
29
import io.grpc.InternalLogId;
30
import io.grpc.InternalMetadata;
31
import io.grpc.Metadata;
32
import io.grpc.ServerStreamTracer;
33
import io.grpc.Status;
34
import io.grpc.internal.GrpcUtil;
35
import io.grpc.internal.ReadableBuffers;
36
import io.grpc.internal.ServerTransportListener;
37
import io.grpc.internal.StatsTraceContext;
38
import java.io.IOException;
39
import java.net.InetSocketAddress;
40
import java.net.URI;
41
import java.net.URISyntaxException;
42
import java.nio.charset.StandardCharsets;
43
import java.util.ArrayList;
44
import java.util.Arrays;
45
import java.util.Enumeration;
46
import java.util.List;
47
import java.util.concurrent.TimeUnit;
48
import java.util.logging.Logger;
49
import javax.servlet.AsyncContext;
50
import javax.servlet.AsyncEvent;
51
import javax.servlet.AsyncListener;
52
import javax.servlet.ReadListener;
53
import javax.servlet.ServletInputStream;
54
import javax.servlet.http.HttpServletRequest;
55
import javax.servlet.http.HttpServletResponse;
56

57
/**
58
 * An adapter that transforms {@link HttpServletRequest} into gRPC request and lets a gRPC server
59
 * process it, and transforms the gRPC response into {@link HttpServletResponse}. An adapter can be
60
 * instantiated by {@link ServletServerBuilder#buildServletAdapter()}.
61
 *
62
 * <p>In a servlet, calling {@link #doPost(HttpServletRequest, HttpServletResponse)} inside {@link
63
 * javax.servlet.http.HttpServlet#doPost(HttpServletRequest, HttpServletResponse)} makes the servlet
64
 * backed by the gRPC server associated with the adapter. The servlet must support Asynchronous
65
 * Processing and must be deployed to a container that supports servlet 4.0 and enables HTTP/2.
66
 *
67
 * <p>The API is experimental. The authors would like to know more about the real usecases. Users
68
 * are welcome to provide feedback by commenting on
69
 * <a href=https://github.com/grpc/grpc-java/issues/5066>the tracking issue</a>.
70
 */
71
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066")
72
public final class ServletAdapter {
73

74
  static final Logger logger = Logger.getLogger(ServletAdapter.class.getName());
1✔
75

76
  private final ServerTransportListener transportListener;
77
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
78
  private final int maxInboundMessageSize;
79
  private final Attributes attributes;
80

81
  ServletAdapter(
82
      ServerTransportListener transportListener,
83
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
84
      int maxInboundMessageSize) {
1✔
85
    this.transportListener = transportListener;
1✔
86
    this.streamTracerFactories = streamTracerFactories;
1✔
87
    this.maxInboundMessageSize = maxInboundMessageSize;
1✔
88
    attributes = transportListener.transportReady(Attributes.EMPTY);
1✔
89
  }
1✔
90

91
  /**
92
   * Call this method inside {@link javax.servlet.http.HttpServlet#doGet(HttpServletRequest,
93
   * HttpServletResponse)} to serve gRPC GET request.
94
   *
95
   * <p>This method is currently not implemented.
96
   *
97
   * <p>Note that in rare case gRPC client sends GET requests.
98
   *
99
   * <p>Do not modify {@code req} and {@code resp} before or after calling this method. However,
100
   * calling {@code resp.setBufferSize()} before invocation is allowed.
101
   */
102
  public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
103
    resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, "GET method not supported");
×
104
  }
×
105

106
  /**
107
   * Call this method inside {@link javax.servlet.http.HttpServlet#doPost(HttpServletRequest,
108
   * HttpServletResponse)} to serve gRPC POST request.
109
   *
110
   * <p>Do not modify {@code req} and {@code resp} before or after calling this method. However,
111
   * calling {@code resp.setBufferSize()} before invocation is allowed.
112
   */
113
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
114
    checkArgument(req.isAsyncSupported(), "servlet does not support asynchronous operation");
1✔
115
    checkArgument(ServletAdapter.isGrpc(req), "the request is not a gRPC request");
1✔
116

117
    InternalLogId logId = InternalLogId.allocate(ServletAdapter.class, null);
1✔
118
    logger.log(FINE, "[{0}] RPC started", logId);
1✔
119

120
    AsyncContext asyncCtx = req.startAsync(req, resp);
1✔
121

122
    String method = req.getRequestURI().substring(1); // remove the leading "/"
1✔
123
    Metadata headers = getHeaders(req);
1✔
124

125
    if (logger.isLoggable(FINEST)) {
1✔
126
      logger.log(FINEST, "[{0}] method: {1}", new Object[] {logId, method});
×
127
      logger.log(FINEST, "[{0}] headers: {1}", new Object[] {logId, headers});
×
128
    }
129

130
    Long timeoutNanos = headers.get(TIMEOUT_KEY);
1✔
131
    if (timeoutNanos == null) {
1✔
132
      timeoutNanos = 0L;
1✔
133
    }
134
    asyncCtx.setTimeout(TimeUnit.NANOSECONDS.toMillis(timeoutNanos));
1✔
135
    StatsTraceContext statsTraceCtx =
1✔
136
        StatsTraceContext.newServerContext(streamTracerFactories, method, headers);
1✔
137

138
    ServletServerStream stream = new ServletServerStream(
1✔
139
        asyncCtx,
140
        statsTraceCtx,
141
        maxInboundMessageSize,
142
        attributes.toBuilder()
1✔
143
            .set(
1✔
144
                Grpc.TRANSPORT_ATTR_REMOTE_ADDR,
145
                new InetSocketAddress(req.getRemoteHost(), req.getRemotePort()))
1✔
146
            .set(
1✔
147
                Grpc.TRANSPORT_ATTR_LOCAL_ADDR,
148
                new InetSocketAddress(req.getLocalAddr(), req.getLocalPort()))
1✔
149
            .build(),
1✔
150
        getAuthority(req),
1✔
151
        logId);
152

153
    transportListener.streamCreated(stream, method, headers);
1✔
154
    stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated);
1✔
155

156
    asyncCtx.getRequest().getInputStream()
1✔
157
        .setReadListener(new GrpcReadListener(stream, asyncCtx, logId));
1✔
158
    asyncCtx.addListener(new GrpcAsyncListener(stream, logId));
1✔
159
  }
1✔
160

161
  // This method must use Enumeration and its members, since that is the only way to read headers
162
  // from the servlet api.
163
  @SuppressWarnings("JdkObsolete")
164
  private static Metadata getHeaders(HttpServletRequest req) {
165
    Enumeration<String> headerNames = req.getHeaderNames();
1✔
166
    checkNotNull(
1✔
167
        headerNames, "Servlet container does not allow HttpServletRequest.getHeaderNames()");
168
    List<byte[]> byteArrays = new ArrayList<>();
1✔
169
    while (headerNames.hasMoreElements()) {
1✔
170
      String headerName = headerNames.nextElement();
1✔
171
      Enumeration<String> values = req.getHeaders(headerName);
1✔
172
      if (values == null) {
1✔
173
        continue;
×
174
      }
175
      while (values.hasMoreElements()) {
1✔
176
        String value = values.nextElement();
1✔
177
        if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
178
          byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII));
1✔
179
          byteArrays.add(BaseEncoding.base64().decode(value));
1✔
180
        } else {
181
          byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII));
1✔
182
          byteArrays.add(value.getBytes(StandardCharsets.US_ASCII));
1✔
183
        }
184
      }
1✔
185
    }
1✔
186
    return InternalMetadata.newMetadata(byteArrays.toArray(new byte[][]{}));
1✔
187
  }
188

189
  // This method must use HttpRequest#getRequestURL or HttpUtils#getRequestURL, both of which
190
  // can only return StringBuffer instances
191
  @SuppressWarnings("JdkObsolete")
192
  private static String getAuthority(HttpServletRequest req) {
193
    try {
194
      return new URI(req.getRequestURL().toString()).getAuthority();
1✔
195
    } catch (URISyntaxException e) {
×
196
      logger.log(FINE, "Error getting authority from the request URL {0}", req.getRequestURL());
×
197
      return req.getServerName() + ":" + req.getServerPort();
×
198
    }
199
  }
200

201
  /**
202
   * Call this method when the adapter is no longer needed. The gRPC server will be terminated.
203
   */
204
  public void destroy() {
205
    transportListener.transportTerminated();
1✔
206
  }
1✔
207

208
  private static final class GrpcAsyncListener implements AsyncListener {
209
    final InternalLogId logId;
210
    final ServletServerStream stream;
211

212
    GrpcAsyncListener(ServletServerStream stream, InternalLogId logId) {
1✔
213
      this.stream = stream;
1✔
214
      this.logId = logId;
1✔
215
    }
1✔
216

217
    @Override
218
    public void onComplete(AsyncEvent event) {}
1✔
219

220
    @Override
221
    public void onTimeout(AsyncEvent event) {
222
      if (logger.isLoggable(FINE)) {
1✔
223
        logger.log(FINE, String.format("[{%s}] Timeout: ", logId), event.getThrowable());
×
224
      }
225
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
226
      // Else, the container will send RST_STREAM in the end.
227
      if (!event.getAsyncContext().getResponse().isCommitted()) {
1✔
228
        stream.cancel(Status.DEADLINE_EXCEEDED);
1✔
229
      } else {
230
        stream.transportState().runOnTransportThread(
×
231
            () -> stream.transportState().transportReportStatus(Status.DEADLINE_EXCEEDED));
×
232
      }
233
    }
1✔
234

235
    @Override
236
    public void onError(AsyncEvent event) {
237
      if (logger.isLoggable(FINE)) {
1✔
238
        logger.log(FINE, String.format("[{%s}] Error: ", logId), event.getThrowable());
×
239
      }
240

241
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
242
      // Else, the container will send RST_STREAM at the end.
243
      if (!event.getAsyncContext().getResponse().isCommitted()) {
1✔
244
        stream.cancel(Status.fromThrowable(event.getThrowable()));
×
245
      } else {
246
        stream.transportState().runOnTransportThread(
1✔
247
            () -> stream.transportState().transportReportStatus(
1✔
248
                Status.fromThrowable(event.getThrowable())));
1✔
249
      }
250
    }
1✔
251

252
    @Override
253
    public void onStartAsync(AsyncEvent event) {}
×
254
  }
255

256
  private static final class GrpcReadListener implements ReadListener {
257
    final ServletServerStream stream;
258
    final AsyncContext asyncCtx;
259
    final ServletInputStream input;
260
    final InternalLogId logId;
261

262
    GrpcReadListener(
263
        ServletServerStream stream,
264
        AsyncContext asyncCtx,
265
        InternalLogId logId) throws IOException {
1✔
266
      this.stream = stream;
1✔
267
      this.asyncCtx = asyncCtx;
1✔
268
      input = asyncCtx.getRequest().getInputStream();
1✔
269
      this.logId = logId;
1✔
270
    }
1✔
271

272
    final byte[] buffer = new byte[4 * 1024];
1✔
273

274
    @Override
275
    public void onDataAvailable() throws IOException {
276
      logger.log(FINEST, "[{0}] onDataAvailable: ENTRY", logId);
1✔
277

278
      while (input.isReady()) {
1✔
279
        int length = input.read(buffer);
1✔
280
        if (length == -1) {
1✔
281
          logger.log(FINEST, "[{0}] inbound data: read end of stream", logId);
×
282
          return;
×
283
        } else {
284
          if (logger.isLoggable(FINEST)) {
1✔
285
            logger.log(
×
286
                FINEST,
287
                "[{0}] inbound data: length = {1}, bytes = {2}",
288
                new Object[] {logId, length, ServletServerStream.toHexString(buffer, length)});
×
289
          }
290

291
          byte[] copy = Arrays.copyOf(buffer, length);
1✔
292
          stream.transportState().runOnTransportThread(
1✔
293
              () -> stream.transportState().inboundDataReceived(ReadableBuffers.wrap(copy), false));
1✔
294
        }
295
      }
1✔
296

297
      logger.log(FINEST, "[{0}] onDataAvailable: EXIT", logId);
1✔
298
    }
1✔
299

300
    @Override
301
    public void onAllDataRead() {
302
      logger.log(FINE, "[{0}] onAllDataRead", logId);
1✔
303
      stream.transportState().runOnTransportThread(() ->
1✔
304
          stream.transportState().inboundDataReceived(ReadableBuffers.empty(), true));
1✔
305
    }
1✔
306

307
    @Override
308
    public void onError(Throwable t) {
309
      if (logger.isLoggable(FINE)) {
1✔
310
        logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
×
311
      }
312
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
313
      // Else, the container will send RST_STREAM at the end.
314
      if (!asyncCtx.getResponse().isCommitted()) {
1✔
315
        stream.cancel(Status.fromThrowable(t));
1✔
316
      } else {
317
        stream.transportState().runOnTransportThread(
×
318
            () -> stream.transportState()
×
319
                .transportReportStatus(Status.fromThrowable(t)));
×
320
      }
321
    }
1✔
322
  }
323

324
  /**
325
   * Checks whether an incoming {@code HttpServletRequest} may come from a gRPC client.
326
   *
327
   * @return true if the request comes from a gRPC client
328
   */
329
  public static boolean isGrpc(HttpServletRequest request) {
330
    return request.getContentType() != null
1✔
331
        && request.getContentType().contains(GrpcUtil.CONTENT_TYPE_GRPC);
1✔
332
  }
333
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc