• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19960

26 Aug 2025 10:27AM UTC coverage: 88.555% (-0.002%) from 88.557%
#19960

push

github

web-flow
binder: Improve error descriptions for ServiceConnection callbacks (#12263)

Non-experts don't really know what these ServiceConnection callback
names mean (eg b/437170499). Use the Status description to explain them
a bit. Compare to https://github.com/grpc/grpc-java/pull/11628

34693 of 39177 relevant lines covered (88.55%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.34
/../servlet/src/main/java/io/grpc/servlet/ServletServerStream.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_GRPC;
20
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
21
import static java.lang.Math.max;
22
import static java.lang.Math.min;
23
import static java.util.logging.Level.FINE;
24
import static java.util.logging.Level.FINEST;
25
import static java.util.logging.Level.WARNING;
26

27
import com.google.common.io.BaseEncoding;
28
import com.google.common.util.concurrent.MoreExecutors;
29
import io.grpc.Attributes;
30
import io.grpc.InternalLogId;
31
import io.grpc.Metadata;
32
import io.grpc.Status;
33
import io.grpc.internal.AbstractServerStream;
34
import io.grpc.internal.GrpcUtil;
35
import io.grpc.internal.SerializingExecutor;
36
import io.grpc.internal.StatsTraceContext;
37
import io.grpc.internal.TransportFrameUtil;
38
import io.grpc.internal.TransportTracer;
39
import io.grpc.internal.WritableBuffer;
40
import java.io.IOException;
41
import java.nio.charset.StandardCharsets;
42
import java.util.Collections;
43
import java.util.HashMap;
44
import java.util.Map;
45
import java.util.function.Supplier;
46
import java.util.logging.Logger;
47
import javax.annotation.Nullable;
48
import javax.servlet.AsyncContext;
49
import javax.servlet.WriteListener;
50
import javax.servlet.http.HttpServletResponse;
51

52
final class ServletServerStream extends AbstractServerStream {
53

54
  private static final Logger logger = Logger.getLogger(ServletServerStream.class.getName());
1✔
55

56
  private final ServletTransportState transportState;
57
  private final Sink sink = new Sink();
1✔
58
  private final HttpServletResponse resp;
59
  private final Attributes attributes;
60
  private final String authority;
61
  private final InternalLogId logId;
62
  private final AsyncServletOutputStreamWriter writer;
63
  /**
64
   * If the async servlet operation has been completed.
65
   */
66
  volatile boolean asyncCompleted = false;
1✔
67

68
  ServletServerStream(
69
      AsyncContext asyncCtx,
70
      StatsTraceContext statsTraceCtx,
71
      int maxInboundMessageSize,
72
      Attributes attributes,
73
      String authority,
74
      InternalLogId logId) throws IOException {
75
    super(ByteArrayWritableBuffer::new, statsTraceCtx);
1✔
76
    transportState =
1✔
77
        new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer());
78
    this.attributes = attributes;
1✔
79
    this.authority = authority;
1✔
80
    this.logId = logId;
1✔
81
    this.resp = (HttpServletResponse) asyncCtx.getResponse();
1✔
82
    this.writer = new AsyncServletOutputStreamWriter(
1✔
83
        asyncCtx, transportState, logId);
84
    resp.getOutputStream().setWriteListener(new GrpcWriteListener());
1✔
85
  }
1✔
86

87
  @Override
88
  protected ServletTransportState transportState() {
89
    return transportState;
1✔
90
  }
91

92
  @Override
93
  public Attributes getAttributes() {
94
    return attributes;
1✔
95
  }
96

97
  @Override
98
  public String getAuthority() {
99
    return authority;
1✔
100
  }
101

102
  @Override
103
  public int streamId() {
104
    return -1;
1✔
105
  }
106

107
  @Override
108
  protected Sink abstractServerStreamSink() {
109
    return sink;
1✔
110
  }
111

112
  private void writeHeadersToServletResponse(Metadata metadata) {
113
    // Discard any application supplied duplicates of the reserved headers
114
    metadata.discardAll(CONTENT_TYPE_KEY);
1✔
115
    metadata.discardAll(GrpcUtil.TE_HEADER);
1✔
116
    metadata.discardAll(GrpcUtil.USER_AGENT_KEY);
1✔
117

118
    if (logger.isLoggable(FINE)) {
1✔
119
      logger.log(FINE, "[{0}] writeHeaders {1}", new Object[] {logId, metadata});
×
120
    }
121

122
    resp.setStatus(HttpServletResponse.SC_OK);
1✔
123
    resp.setContentType(CONTENT_TYPE_GRPC);
1✔
124

125
    byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(metadata);
1✔
126
    for (int i = 0; i < serializedHeaders.length; i += 2) {
1✔
127
      resp.addHeader(
1✔
128
          new String(serializedHeaders[i], StandardCharsets.US_ASCII),
129
          new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII));
130
    }
131
  }
1✔
132

133
  final class ServletTransportState extends TransportState {
134

135
    private final SerializingExecutor transportThreadExecutor =
1✔
136
        new SerializingExecutor(MoreExecutors.directExecutor());
1✔
137

138
    private ServletTransportState(
139
        int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) {
1✔
140
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
141
    }
1✔
142

143
    @Override
144
    public void runOnTransportThread(Runnable r) {
145
      transportThreadExecutor.execute(r);
1✔
146
    }
1✔
147

148
    @Override
149
    public void bytesRead(int numBytes) {
150
      // no-op
151
      // no flow control yet
152
    }
1✔
153

154
    @Override
155
    public void deframeFailed(Throwable cause) {
156
      if (logger.isLoggable(WARNING)) {
×
157
        logger.log(WARNING, String.format("[{%s}] Exception processing message", logId), cause);
×
158
      }
159
      cancel(Status.fromThrowable(cause));
×
160
    }
×
161
  }
162

163
  private static final class ByteArrayWritableBuffer implements WritableBuffer {
164

165
    private final int capacity;
166
    final byte[] bytes;
167
    private int index;
168

169
    ByteArrayWritableBuffer(int capacityHint) {
1✔
170
      this.bytes = new byte[min(1024 * 1024, capacityHint)];
1✔
171
      this.capacity = bytes.length;
1✔
172
    }
1✔
173

174
    @Override
175
    public void write(byte[] src, int srcIndex, int length) {
176
      System.arraycopy(src, srcIndex, bytes, index, length);
1✔
177
      index += length;
1✔
178
    }
1✔
179

180
    @Override
181
    public void write(byte b) {
182
      bytes[index++] = b;
×
183
    }
×
184

185
    @Override
186
    public int writableBytes() {
187
      return capacity - index;
1✔
188
    }
189

190
    @Override
191
    public int readableBytes() {
192
      return index;
1✔
193
    }
194

195
    @Override
196
    public void release() {}
×
197
  }
198

199
  private final class GrpcWriteListener implements WriteListener {
1✔
200

201
    @Override
202
    public void onError(Throwable t) {
203
      if (logger.isLoggable(FINE)) {
×
204
        logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
×
205
      }
206

207
      // If the resp is not committed, cancel() to avoid being redirected to an error page.
208
      // Else, the container will send RST_STREAM at the end.
209
      if (!resp.isCommitted()) {
×
210
        cancel(Status.fromThrowable(t));
×
211
      } else {
212
        transportState.runOnTransportThread(
×
213
            () -> transportState.transportReportStatus(Status.fromThrowable(t)));
×
214
      }
215
    }
×
216

217
    @Override
218
    public void onWritePossible() throws IOException {
219
      writer.onWritePossible();
1✔
220
    }
1✔
221
  }
222

223
  private final class Sink implements AbstractServerStream.Sink {
1✔
224
    final TrailerSupplier trailerSupplier = new TrailerSupplier();
1✔
225

226
    @Override
227
    public void writeHeaders(Metadata headers, boolean flush) {
228
      writeHeadersToServletResponse(headers);
1✔
229
      resp.setTrailerFields(trailerSupplier);
1✔
230
      try {
231
        writer.flush();
1✔
232
      } catch (IOException e) {
×
233
        logger.log(WARNING, String.format("[{%s}] Exception when flushBuffer", logId), e);
×
234
        cancel(Status.fromThrowable(e));
×
235
      }
1✔
236
    }
1✔
237

238
    @Override
239
    public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages) {
240
      if (frame == null && !flush) {
1✔
241
        return;
×
242
      }
243

244
      if (logger.isLoggable(FINEST)) {
1✔
245
        logger.log(
×
246
            FINEST,
247
            "[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}",
248
            new Object[]{logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages});
×
249
      }
250

251
      try {
252
        if (frame != null) {
1✔
253
          int numBytes = frame.readableBytes();
1✔
254
          if (numBytes > 0) {
1✔
255
            onSendingBytes(numBytes);
1✔
256
          }
257
          writer.writeBytes(((ByteArrayWritableBuffer) frame).bytes, frame.readableBytes());
1✔
258
        }
259

260
        if (flush) {
1✔
261
          writer.flush();
1✔
262
        }
263
      } catch (IOException e) {
×
264
        logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e);
×
265
        cancel(Status.fromThrowable(e));
×
266
      }
1✔
267
    }
1✔
268

269
    @Override
270
    public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
271
      if (logger.isLoggable(FINE)) {
1✔
272
        logger.log(
×
273
            FINE,
274
            "[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}",
275
            new Object[] {logId, trailers, headersSent, status});
×
276
      }
277
      if (!headersSent) {
1✔
278
        writeHeadersToServletResponse(trailers);
1✔
279
      } else {
280
        byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers);
1✔
281
        for (int i = 0; i < serializedHeaders.length; i += 2) {
1✔
282
          String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII);
1✔
283
          String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII);
1✔
284
          trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue);
1✔
285
          trailerSupplier.get().putIfAbsent(key, newValue);
1✔
286
        }
287
      }
288

289
      writer.complete();
1✔
290
    }
1✔
291

292
    @Override
293
    public void cancel(Status status) {
294
      transportState.runOnTransportThread(() -> transportState.transportReportStatus(status));
1✔
295
      if (asyncCompleted) {
1✔
296
        logger.fine("ignore cancel as already completed");
×
297
        return;
×
298
      }
299
      // There is no way to RST_STREAM with CANCEL code, so write trailers instead
300
      close(status, new Metadata());
1✔
301
      // close() calls writeTrailers(), which calls AsyncContext.complete()
302
    }
1✔
303
  }
304

305
  private static final class TrailerSupplier implements Supplier<Map<String, String>> {
306
    final Map<String, String> trailers = Collections.synchronizedMap(new HashMap<>());
1✔
307

308
    TrailerSupplier() {}
1✔
309

310
    @Override
311
    public Map<String, String> get() {
312
      return trailers;
1✔
313
    }
314
  }
315

316
  static String toHexString(byte[] bytes, int length) {
317
    String hex = BaseEncoding.base16().encode(bytes, 0, min(length, 64));
×
318
    if (length > 80) {
×
319
      hex += "...";
×
320
    }
321
    if (length > 64) {
×
322
      int offset = max(64, length - 16);
×
323
      hex += BaseEncoding.base16().encode(bytes, offset, length - offset);
×
324
    }
325
    return hex;
×
326
  }
327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc