• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19108

21 Mar 2024 10:37PM UTC coverage: 88.277% (-0.002%) from 88.279%
#19108

push

github

web-flow
Allow configuration of the queued byte threshold at which a Stream is considered not ready (#10977)

* Allow the queued byte threshold for a Stream to be ready to be configurable

- on clients this is exposed by setting a CallOption
- on servers this is configured by calling a method on ServerCall or ServerStreamListener

31190 of 35332 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.38
/../util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.util;
18

19
import com.google.common.util.concurrent.MoreExecutors;
20
import com.google.common.util.concurrent.SettableFuture;
21
import io.grpc.Attributes;
22
import io.grpc.ExperimentalApi;
23
import io.grpc.ForwardingServerCall;
24
import io.grpc.ForwardingServerCallListener;
25
import io.grpc.Metadata;
26
import io.grpc.ServerCall;
27
import io.grpc.ServerCallHandler;
28
import io.grpc.ServerInterceptor;
29
import io.grpc.Status;
30
import io.grpc.StatusRuntimeException;
31
import io.grpc.internal.SerializingExecutor;
32
import java.util.concurrent.ExecutionException;
33
import javax.annotation.Nullable;
34

35
/**
36
 * A class that intercepts uncaught exceptions of type {@link StatusRuntimeException} and handles
37
 * them by closing the {@link ServerCall}, and transmitting the exception's status and metadata
38
 * to the client.
39
 *
40
 * <p>Without this interceptor, gRPC will strip all details and close the {@link ServerCall} with
41
 * a generic {@link Status#UNKNOWN} code.
42
 *
43
 * <p>Security warning: the {@link Status} and {@link Metadata} may contain sensitive server-side
44
 * state information, and generally should not be sent to clients. Only install this interceptor
45
 * if all clients are trusted.
46
 */
47
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2189")
48
public final class TransmitStatusRuntimeExceptionInterceptor implements ServerInterceptor {
49
  private TransmitStatusRuntimeExceptionInterceptor() {
50
  }
51

52
  public static ServerInterceptor instance() {
53
    return new TransmitStatusRuntimeExceptionInterceptor();
1✔
54
  }
55

56
  @Override
57
  public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
58
      ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
59
    final ServerCall<ReqT, RespT> serverCall = new SerializingServerCall<>(call);
1✔
60
    ServerCall.Listener<ReqT> listener = next.startCall(serverCall, headers);
1✔
61
    return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(listener) {
1✔
62
      @Override
63
      public void onMessage(ReqT message) {
64
        try {
65
          super.onMessage(message);
×
66
        } catch (StatusRuntimeException e) {
1✔
67
          closeWithException(e);
1✔
68
        }
×
69
      }
1✔
70

71
      @Override
72
      public void onHalfClose() {
73
        try {
74
          super.onHalfClose();
×
75
        } catch (StatusRuntimeException e) {
1✔
76
          closeWithException(e);
1✔
77
        }
×
78
      }
1✔
79

80
      @Override
81
      public void onCancel() {
82
        try {
83
          super.onCancel();
×
84
        } catch (StatusRuntimeException e) {
1✔
85
          closeWithException(e);
1✔
86
        }
×
87
      }
1✔
88

89
      @Override
90
      public void onComplete() {
91
        try {
92
          super.onComplete();
×
93
        } catch (StatusRuntimeException e) {
1✔
94
          closeWithException(e);
1✔
95
        }
×
96
      }
1✔
97

98
      @Override
99
      public void onReady() {
100
        try {
101
          super.onReady();
×
102
        } catch (StatusRuntimeException e) {
1✔
103
          closeWithException(e);
1✔
104
        }
×
105
      }
1✔
106

107
      private void closeWithException(StatusRuntimeException t) {
108
        Metadata metadata = t.getTrailers();
1✔
109
        if (metadata == null) {
1✔
110
          metadata = new Metadata();
×
111
        }
112
        serverCall.close(t.getStatus(), metadata);
1✔
113
      }
1✔
114
    };
115
  }
116

117
  /**
118
   * A {@link ServerCall} that wraps around a non thread safe delegate and provides thread safe
119
   * access by serializing everything on an executor.
120
   */
121
  private static class SerializingServerCall<ReqT, RespT> extends
122
      ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {
123
    private static final String ERROR_MSG = "Encountered error during serialized access";
124
    private final SerializingExecutor serializingExecutor =
1✔
125
        new SerializingExecutor(MoreExecutors.directExecutor());
1✔
126
    private boolean closeCalled = false;
1✔
127

128
    SerializingServerCall(ServerCall<ReqT, RespT> delegate) {
129
      super(delegate);
1✔
130
    }
1✔
131

132
    @Override
133
    public void sendMessage(final RespT message) {
134
      serializingExecutor.execute(new Runnable() {
×
135
        @Override
136
        public void run() {
137
          SerializingServerCall.super.sendMessage(message);
×
138
        }
×
139
      });
140
    }
×
141

142
    @Override
143
    public void request(final int numMessages) {
144
      serializingExecutor.execute(new Runnable() {
×
145
        @Override
146
        public void run() {
147
          SerializingServerCall.super.request(numMessages);
×
148
        }
×
149
      });
150
    }
×
151

152
    @Override
153
    public void sendHeaders(final Metadata headers) {
154
      serializingExecutor.execute(new Runnable() {
×
155
        @Override
156
        public void run() {
157
          SerializingServerCall.super.sendHeaders(headers);
×
158
        }
×
159
      });
160
    }
×
161

162
    @Override
163
    public void close(final Status status, final Metadata trailers) {
164
      serializingExecutor.execute(new Runnable() {
1✔
165
        @Override
166
        public void run() {
167
          if (!closeCalled) {
1✔
168
            closeCalled = true;
1✔
169

170
            SerializingServerCall.super.close(status, trailers);
1✔
171
          }
172
        }
1✔
173
      });
174
    }
1✔
175

176
    @Override
177
    public boolean isReady() {
178
      final SettableFuture<Boolean> retVal = SettableFuture.create();
×
179
      serializingExecutor.execute(new Runnable() {
×
180
        @Override
181
        public void run() {
182
          retVal.set(SerializingServerCall.super.isReady());
×
183
        }
×
184
      });
185
      try {
186
        return retVal.get();
×
187
      } catch (InterruptedException e) {
×
188
        throw new RuntimeException(ERROR_MSG, e);
×
189
      } catch (ExecutionException e) {
×
190
        throw new RuntimeException(ERROR_MSG, e);
×
191
      }
192
    }
193

194
    @Override
195
    public boolean isCancelled() {
196
      final SettableFuture<Boolean> retVal = SettableFuture.create();
×
197
      serializingExecutor.execute(new Runnable() {
×
198
        @Override
199
        public void run() {
200
          retVal.set(SerializingServerCall.super.isCancelled());
×
201
        }
×
202
      });
203
      try {
204
        return retVal.get();
×
205
      } catch (InterruptedException e) {
×
206
        throw new RuntimeException(ERROR_MSG, e);
×
207
      } catch (ExecutionException e) {
×
208
        throw new RuntimeException(ERROR_MSG, e);
×
209
      }
210
    }
211

212
    @Override
213
    public void setMessageCompression(final boolean enabled) {
214
      serializingExecutor.execute(new Runnable() {
×
215
        @Override
216
        public void run() {
217
          SerializingServerCall.super.setMessageCompression(enabled);
×
218
        }
×
219
      });
220
    }
×
221

222
    @Override
223
    @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
224
    public void setOnReadyThreshold(final int numBytes) {
225
      serializingExecutor.execute(new Runnable() {
×
226
        @Override
227
        public void run() {
228
          SerializingServerCall.super.setOnReadyThreshold(numBytes);
×
229
        }
×
230
      });
231
    }
×
232

233
    @Override
234
    public void setCompression(final String compressor) {
235
      serializingExecutor.execute(new Runnable() {
×
236
        @Override
237
        public void run() {
238
          SerializingServerCall.super.setCompression(compressor);
×
239
        }
×
240
      });
241
    }
×
242

243
    @Override
244
    public Attributes getAttributes() {
245
      final SettableFuture<Attributes> retVal = SettableFuture.create();
×
246
      serializingExecutor.execute(new Runnable() {
×
247
        @Override
248
        public void run() {
249
          retVal.set(SerializingServerCall.super.getAttributes());
×
250
        }
×
251
      });
252
      try {
253
        return retVal.get();
×
254
      } catch (InterruptedException e) {
×
255
        throw new RuntimeException(ERROR_MSG, e);
×
256
      } catch (ExecutionException e) {
×
257
        throw new RuntimeException(ERROR_MSG, e);
×
258
      }
259
    }
260

261
    @Nullable
262
    @Override
263
    public String getAuthority() {
264
      final SettableFuture<String> retVal = SettableFuture.create();
×
265
      serializingExecutor.execute(new Runnable() {
×
266
        @Override
267
        public void run() {
268
          retVal.set(SerializingServerCall.super.getAuthority());
×
269
        }
×
270
      });
271
      try {
272
        return retVal.get();
×
273
      } catch (InterruptedException e) {
×
274
        throw new RuntimeException(ERROR_MSG, e);
×
275
      } catch (ExecutionException e) {
×
276
        throw new RuntimeException(ERROR_MSG, e);
×
277
      }
278
    }
279
  }
280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc