• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19678

05 Feb 2025 06:37PM CUT coverage: 88.566% (-0.03%) from 88.592%
#19678

push

github

web-flow
xds: Improve XdsNR's selectConfig() variable handling

The variables from the do-while are no longer initialized to let the
compiler verify that the loop sets each. Unnecessary comparisons to null
are also removed and is more obvious as the variables are never set to
null. Added a minor optimization of computing the RPCs path once instead
of once for each route. The variable declarations were also sorted to
match their initialization order.

This does fix an unlikely bug where if the old code could successfully
matched a route but fail to retain the cluster, then when trying a
second time if the route was _not_ matched it would re-use the prior route
and thus infinite-loop failing to retain that same cluster.

It also adds a missing cast to unsigned long for a uint32 weight. The old
code would detect if the _sum_ was negative, but a weight using 32 bits
would have been negative and never selected.

33755 of 38113 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.81
/../servlet/src/main/java/io/grpc/servlet/ServletServerStream.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_GRPC;
20
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
21
import static java.lang.Math.max;
22
import static java.lang.Math.min;
23
import static java.util.logging.Level.FINE;
24
import static java.util.logging.Level.FINEST;
25
import static java.util.logging.Level.WARNING;
26

27
import com.google.common.io.BaseEncoding;
28
import com.google.common.util.concurrent.MoreExecutors;
29
import io.grpc.Attributes;
30
import io.grpc.InternalLogId;
31
import io.grpc.Metadata;
32
import io.grpc.Status;
33
import io.grpc.Status.Code;
34
import io.grpc.internal.AbstractServerStream;
35
import io.grpc.internal.GrpcUtil;
36
import io.grpc.internal.SerializingExecutor;
37
import io.grpc.internal.StatsTraceContext;
38
import io.grpc.internal.TransportFrameUtil;
39
import io.grpc.internal.TransportTracer;
40
import io.grpc.internal.WritableBuffer;
41
import java.io.IOException;
42
import java.nio.charset.StandardCharsets;
43
import java.util.Collections;
44
import java.util.HashMap;
45
import java.util.Map;
46
import java.util.concurrent.CountDownLatch;
47
import java.util.concurrent.TimeUnit;
48
import java.util.function.Supplier;
49
import java.util.logging.Logger;
50
import javax.annotation.Nullable;
51
import javax.servlet.AsyncContext;
52
import javax.servlet.WriteListener;
53
import javax.servlet.http.HttpServletResponse;
54

55
final class ServletServerStream extends AbstractServerStream {
56

57
  private static final Logger logger = Logger.getLogger(ServletServerStream.class.getName());
1✔
58

59
  private final ServletTransportState transportState;
60
  private final Sink sink = new Sink();
1✔
61
  private final AsyncContext asyncCtx;
62
  private final HttpServletResponse resp;
63
  private final Attributes attributes;
64
  private final String authority;
65
  private final InternalLogId logId;
66
  private final AsyncServletOutputStreamWriter writer;
67

68
  ServletServerStream(
69
      AsyncContext asyncCtx,
70
      StatsTraceContext statsTraceCtx,
71
      int maxInboundMessageSize,
72
      Attributes attributes,
73
      String authority,
74
      InternalLogId logId) throws IOException {
75
    super(ByteArrayWritableBuffer::new, statsTraceCtx);
1✔
76
    transportState =
1✔
77
        new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer());
78
    this.attributes = attributes;
1✔
79
    this.authority = authority;
1✔
80
    this.logId = logId;
1✔
81
    this.asyncCtx = asyncCtx;
1✔
82
    this.resp = (HttpServletResponse) asyncCtx.getResponse();
1✔
83
    this.writer = new AsyncServletOutputStreamWriter(
1✔
84
        asyncCtx, transportState, logId);
85
    resp.getOutputStream().setWriteListener(new GrpcWriteListener());
1✔
86
  }
1✔
87

88
  @Override
89
  protected ServletTransportState transportState() {
90
    return transportState;
1✔
91
  }
92

93
  @Override
94
  public Attributes getAttributes() {
95
    return attributes;
1✔
96
  }
97

98
  @Override
99
  public String getAuthority() {
100
    return authority;
1✔
101
  }
102

103
  @Override
104
  public int streamId() {
105
    return -1;
1✔
106
  }
107

108
  @Override
109
  protected Sink abstractServerStreamSink() {
110
    return sink;
1✔
111
  }
112

113
  private void writeHeadersToServletResponse(Metadata metadata) {
114
    // Discard any application supplied duplicates of the reserved headers
115
    metadata.discardAll(CONTENT_TYPE_KEY);
1✔
116
    metadata.discardAll(GrpcUtil.TE_HEADER);
1✔
117
    metadata.discardAll(GrpcUtil.USER_AGENT_KEY);
1✔
118

119
    if (logger.isLoggable(FINE)) {
1✔
120
      logger.log(FINE, "[{0}] writeHeaders {1}", new Object[] {logId, metadata});
×
121
    }
122

123
    resp.setStatus(HttpServletResponse.SC_OK);
1✔
124
    resp.setContentType(CONTENT_TYPE_GRPC);
1✔
125

126
    byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(metadata);
1✔
127
    for (int i = 0; i < serializedHeaders.length; i += 2) {
1✔
128
      resp.addHeader(
1✔
129
          new String(serializedHeaders[i], StandardCharsets.US_ASCII),
130
          new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII));
131
    }
132
  }
1✔
133

134
  final class ServletTransportState extends TransportState {
135

136
    private final SerializingExecutor transportThreadExecutor =
1✔
137
        new SerializingExecutor(MoreExecutors.directExecutor());
1✔
138

139
    private ServletTransportState(
140
        int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) {
1✔
141
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
142
    }
1✔
143

144
    @Override
145
    public void runOnTransportThread(Runnable r) {
146
      transportThreadExecutor.execute(r);
1✔
147
    }
1✔
148

149
    @Override
150
    public void bytesRead(int numBytes) {
151
      // no-op
152
      // no flow control yet
153
    }
1✔
154

155
    @Override
156
    public void deframeFailed(Throwable cause) {
157
      if (logger.isLoggable(WARNING)) {
×
158
        logger.log(WARNING, String.format("[{%s}] Exception processing message", logId), cause);
×
159
      }
160
      cancel(Status.fromThrowable(cause));
×
161
    }
×
162
  }
163

164
  private static final class ByteArrayWritableBuffer implements WritableBuffer {
165

166
    private final int capacity;
167
    final byte[] bytes;
168
    private int index;
169

170
    ByteArrayWritableBuffer(int capacityHint) {
1✔
171
      this.bytes = new byte[min(1024 * 1024, capacityHint)];
1✔
172
      this.capacity = bytes.length;
1✔
173
    }
1✔
174

175
    @Override
176
    public void write(byte[] src, int srcIndex, int length) {
177
      System.arraycopy(src, srcIndex, bytes, index, length);
1✔
178
      index += length;
1✔
179
    }
1✔
180

181
    @Override
182
    public void write(byte b) {
183
      bytes[index++] = b;
×
184
    }
×
185

186
    @Override
187
    public int writableBytes() {
188
      return capacity - index;
1✔
189
    }
190

191
    @Override
192
    public int readableBytes() {
193
      return index;
1✔
194
    }
195

196
    @Override
197
    public void release() {}
×
198
  }
199

200
  private final class GrpcWriteListener implements WriteListener {
1✔
201

202
    @Override
203
    public void onError(Throwable t) {
204
      if (logger.isLoggable(FINE)) {
×
205
        logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
×
206
      }
207

208
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
209
      // Else, the container will send RST_STREAM at the end.
210
      if (!resp.isCommitted()) {
×
211
        cancel(Status.fromThrowable(t));
×
212
      } else {
213
        transportState.runOnTransportThread(
×
214
            () -> transportState.transportReportStatus(Status.fromThrowable(t)));
×
215
      }
216
    }
×
217

218
    @Override
219
    public void onWritePossible() throws IOException {
220
      writer.onWritePossible();
1✔
221
    }
1✔
222
  }
223

224
  private final class Sink implements AbstractServerStream.Sink {
1✔
225
    final TrailerSupplier trailerSupplier = new TrailerSupplier();
1✔
226

227
    @Override
228
    public void writeHeaders(Metadata headers, boolean flush) {
229
      writeHeadersToServletResponse(headers);
1✔
230
      resp.setTrailerFields(trailerSupplier);
1✔
231
      try {
232
        writer.flush();
1✔
233
      } catch (IOException e) {
×
234
        logger.log(WARNING, String.format("[{%s}] Exception when flushBuffer", logId), e);
×
235
        cancel(Status.fromThrowable(e));
×
236
      }
1✔
237
    }
1✔
238

239
    @Override
240
    public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages) {
241
      if (frame == null && !flush) {
1✔
242
        return;
×
243
      }
244

245
      if (logger.isLoggable(FINEST)) {
1✔
246
        logger.log(
×
247
            FINEST,
248
            "[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}",
249
            new Object[]{logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages});
×
250
      }
251

252
      try {
253
        if (frame != null) {
1✔
254
          int numBytes = frame.readableBytes();
1✔
255
          if (numBytes > 0) {
1✔
256
            onSendingBytes(numBytes);
1✔
257
          }
258
          writer.writeBytes(((ByteArrayWritableBuffer) frame).bytes, frame.readableBytes());
1✔
259
        }
260

261
        if (flush) {
1✔
262
          writer.flush();
1✔
263
        }
264
      } catch (IOException e) {
×
265
        logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e);
×
266
        cancel(Status.fromThrowable(e));
×
267
      }
1✔
268
    }
1✔
269

270
    @Override
271
    public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
272
      if (logger.isLoggable(FINE)) {
1✔
273
        logger.log(
×
274
            FINE,
275
            "[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}",
276
            new Object[] {logId, trailers, headersSent, status});
×
277
      }
278
      if (!headersSent) {
1✔
279
        writeHeadersToServletResponse(trailers);
1✔
280
      } else {
281
        byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers);
1✔
282
        for (int i = 0; i < serializedHeaders.length; i += 2) {
1✔
283
          String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII);
1✔
284
          String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII);
1✔
285
          trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue);
1✔
286
          trailerSupplier.get().putIfAbsent(key, newValue);
1✔
287
        }
288
      }
289

290
      writer.complete();
1✔
291
    }
1✔
292

293
    @Override
294
    public void cancel(Status status) {
295
      if (resp.isCommitted() && Code.DEADLINE_EXCEEDED == status.getCode()) {
1✔
296
        return; // let the servlet timeout, the container will sent RST_STREAM automatically
1✔
297
      }
298
      transportState.runOnTransportThread(() -> transportState.transportReportStatus(status));
1✔
299
      // There is no way to RST_STREAM with CANCEL code, so write trailers instead
300
      close(Status.CANCELLED.withCause(status.asRuntimeException()), new Metadata());
1✔
301
      CountDownLatch countDownLatch = new CountDownLatch(1);
1✔
302
      transportState.runOnTransportThread(() -> {
1✔
303
        asyncCtx.complete();
1✔
304
        countDownLatch.countDown();
1✔
305
      });
1✔
306
      try {
307
        countDownLatch.await(5, TimeUnit.SECONDS);
1✔
308
      } catch (InterruptedException e) {
1✔
309
        Thread.currentThread().interrupt();
1✔
310
      }
1✔
311
    }
1✔
312
  }
313

314
  private static final class TrailerSupplier implements Supplier<Map<String, String>> {
315
    final Map<String, String> trailers = Collections.synchronizedMap(new HashMap<>());
1✔
316

317
    TrailerSupplier() {}
1✔
318

319
    @Override
320
    public Map<String, String> get() {
321
      return trailers;
1✔
322
    }
323
  }
324

325
  static String toHexString(byte[] bytes, int length) {
326
    String hex = BaseEncoding.base16().encode(bytes, 0, min(length, 64));
×
327
    if (length > 80) {
×
328
      hex += "...";
×
329
    }
330
    if (length > 64) {
×
331
      int offset = max(64, length - 16);
×
332
      hex += BaseEncoding.base16().encode(bytes, offset, length - offset);
×
333
    }
334
    return hex;
×
335
  }
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc