• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18813

29 Aug 2023 11:09PM UTC coverage: 88.296% (-0.007%) from 88.303%
#18813

push

github-actions

web-flow
Handle exception in servers sendMessage better (v1.58 backport of #10513) (#10520)

* Log warning message when server side gets exception writing message to stream and allow multiple closes (#10513)

* Use internalClose instead of close when sendMessage has a RuntimeException.
* Change argument to internalClose to a Throwable instead of a Status.
* Rename internalClose to handleInternalError

* Update ServerCallImplTest.java

30334 of 34355 relevant lines covered (88.3%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.23
/../core/src/main/java/io/grpc/internal/ServerCallImpl.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.internal.GrpcAttributes.ATTR_SECURITY_LEVEL;
23
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
24
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
25
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
26
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.Throwables;
30
import com.google.common.util.concurrent.MoreExecutors;
31
import io.grpc.Attributes;
32
import io.grpc.Codec;
33
import io.grpc.Compressor;
34
import io.grpc.CompressorRegistry;
35
import io.grpc.Context;
36
import io.grpc.DecompressorRegistry;
37
import io.grpc.InternalDecompressorRegistry;
38
import io.grpc.InternalStatus;
39
import io.grpc.Metadata;
40
import io.grpc.MethodDescriptor;
41
import io.grpc.SecurityLevel;
42
import io.grpc.ServerCall;
43
import io.grpc.Status;
44
import io.grpc.StatusRuntimeException;
45
import io.perfmark.PerfMark;
46
import io.perfmark.Tag;
47
import io.perfmark.TaskCloseable;
48
import java.io.InputStream;
49
import java.util.logging.Level;
50
import java.util.logging.Logger;
51

52
final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
53

54
  private static final Logger log = Logger.getLogger(ServerCallImpl.class.getName());
1✔
55

56
  @VisibleForTesting
57
  static final String TOO_MANY_RESPONSES = "Too many responses";
58
  @VisibleForTesting
59
  static final String MISSING_RESPONSE = "Completed without a response";
60

61
  private final ServerStream stream;
62
  private final MethodDescriptor<ReqT, RespT> method;
63
  private final Tag tag;
64
  private final Context.CancellableContext context;
65
  private final byte[] messageAcceptEncoding;
66
  private final DecompressorRegistry decompressorRegistry;
67
  private final CompressorRegistry compressorRegistry;
68
  private CallTracer serverCallTracer;
69

70
  // state
71
  private volatile boolean cancelled;
72
  private boolean sendHeadersCalled;
73
  private boolean closeCalled;
74
  private Compressor compressor;
75
  private boolean messageSent;
76

77
  ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
78
      Metadata inboundHeaders, Context.CancellableContext context,
79
      DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
80
      CallTracer serverCallTracer, Tag tag) {
1✔
81
    this.stream = stream;
1✔
82
    this.method = method;
1✔
83
    this.context = context;
1✔
84
    this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
85
    this.decompressorRegistry = decompressorRegistry;
1✔
86
    this.compressorRegistry = compressorRegistry;
1✔
87
    this.serverCallTracer = serverCallTracer;
1✔
88
    this.serverCallTracer.reportCallStarted();
1✔
89
    this.tag = tag;
1✔
90
  }
1✔
91

92
  @Override
93
  public void request(int numMessages) {
94
    try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.request")) {
1✔
95
      PerfMark.attachTag(tag);
1✔
96
      stream.request(numMessages);
1✔
97
    }
98
  }
1✔
99

100
  @Override
101
  public void sendHeaders(Metadata headers) {
102
    try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendHeaders")) {
1✔
103
      PerfMark.attachTag(tag);
1✔
104
      sendHeadersInternal(headers);
1✔
105
    }
106
  }
1✔
107

108
  private void sendHeadersInternal(Metadata headers) {
109
    checkState(!sendHeadersCalled, "sendHeaders has already been called");
1✔
110
    checkState(!closeCalled, "call is closed");
1✔
111

112
    headers.discardAll(CONTENT_LENGTH_KEY);
1✔
113
    headers.discardAll(MESSAGE_ENCODING_KEY);
1✔
114
    if (compressor == null) {
1✔
115
      compressor = Codec.Identity.NONE;
1✔
116
    } else {
117
      if (messageAcceptEncoding != null) {
1✔
118
        // TODO(carl-mastrangelo): remove the string allocation.
119
        if (!GrpcUtil.iterableContains(
1✔
120
            ACCEPT_ENCODING_SPLITTER.split(new String(messageAcceptEncoding, GrpcUtil.US_ASCII)),
1✔
121
            compressor.getMessageEncoding())) {
1✔
122
          // resort to using no compression.
123
          compressor = Codec.Identity.NONE;
1✔
124
        }
125
      } else {
126
        compressor = Codec.Identity.NONE;
1✔
127
      }
128
    }
129

130
    // Always put compressor, even if it's identity.
131
    headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
1✔
132

133
    stream.setCompressor(compressor);
1✔
134

135
    headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
136
    byte[] advertisedEncodings =
1✔
137
        InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
1✔
138
    if (advertisedEncodings.length != 0) {
1✔
139
      headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
1✔
140
    }
141

142
    // Don't check if sendMessage has been called, since it requires that sendHeaders was already
143
    // called.
144
    sendHeadersCalled = true;
1✔
145
    stream.writeHeaders(headers);
1✔
146
  }
1✔
147

148
  @Override
149
  public void sendMessage(RespT message) {
150
    try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendMessage")) {
1✔
151
      PerfMark.attachTag(tag);
1✔
152
      sendMessageInternal(message);
1✔
153
    }
154
  }
1✔
155

156
  private void sendMessageInternal(RespT message) {
157
    checkState(sendHeadersCalled, "sendHeaders has not been called");
1✔
158
    checkState(!closeCalled, "call is closed");
1✔
159

160
    if (method.getType().serverSendsOneMessage() && messageSent) {
1✔
161
      handleInternalError(Status.INTERNAL.withDescription(TOO_MANY_RESPONSES).asRuntimeException());
1✔
162
      return;
1✔
163
    }
164

165
    messageSent = true;
1✔
166
    try {
167
      InputStream resp = method.streamResponse(message);
1✔
168
      stream.writeMessage(resp);
1✔
169
      if (!getMethodDescriptor().getType().serverSendsOneMessage()) {
1✔
170
        stream.flush();
1✔
171
      }
172
    } catch (RuntimeException e) {
1✔
173
      handleInternalError(e);
1✔
174
    } catch (Error e) {
×
175
      close(
×
176
          Status.CANCELLED.withDescription("Server sendMessage() failed with Error"),
×
177
          new Metadata());
178
      throw e;
×
179
    }
1✔
180
  }
1✔
181

182
  @Override
183
  public void setMessageCompression(boolean enable) {
184
    stream.setMessageCompression(enable);
1✔
185
  }
1✔
186

187
  @Override
188
  public void setCompression(String compressorName) {
189
    // Added here to give a better error message.
190
    checkState(!sendHeadersCalled, "sendHeaders has been called");
1✔
191

192
    compressor = compressorRegistry.lookupCompressor(compressorName);
1✔
193
    checkArgument(compressor != null, "Unable to find compressor by name %s", compressorName);
1✔
194
  }
1✔
195

196
  @Override
197
  public boolean isReady() {
198
    if (closeCalled) {
1✔
199
      return false;
1✔
200
    }
201
    return stream.isReady();
1✔
202
  }
203

204
  @Override
205
  public void close(Status status, Metadata trailers) {
206
    try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.close")) {
1✔
207
      PerfMark.attachTag(tag);
1✔
208
      closeInternal(status, trailers);
1✔
209
    }
210
  }
1✔
211

212
  private void closeInternal(Status status, Metadata trailers) {
213
    checkState(!closeCalled, "call already closed");
1✔
214
    try {
215
      closeCalled = true;
1✔
216

217
      if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) {
1✔
218
        handleInternalError(Status.INTERNAL.withDescription(MISSING_RESPONSE).asRuntimeException());
1✔
219
        return;
1✔
220
      }
221

222
      stream.close(status, trailers);
1✔
223
    } finally {
224
      serverCallTracer.reportCallEnded(status.isOk());
1✔
225
    }
226
  }
1✔
227

228
  @Override
229
  public boolean isCancelled() {
230
    return cancelled;
1✔
231
  }
232

233
  ServerStreamListener newServerStreamListener(ServerCall.Listener<ReqT> listener) {
234
    return new ServerStreamListenerImpl<>(this, listener, context);
1✔
235
  }
236

237
  @Override
238
  public Attributes getAttributes() {
239
    return stream.getAttributes();
1✔
240
  }
241

242
  @Override
243
  public String getAuthority() {
244
    return stream.getAuthority();
1✔
245
  }
246

247
  @Override
248
  public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
249
    return method;
1✔
250
  }
251

252
  @Override
253
  public SecurityLevel getSecurityLevel() {
254
    final Attributes attributes = getAttributes();
1✔
255
    if (attributes == null) {
1✔
256
      return super.getSecurityLevel();
1✔
257
    }
258
    final SecurityLevel securityLevel = attributes.get(ATTR_SECURITY_LEVEL);
1✔
259
    return securityLevel == null ? super.getSecurityLevel() : securityLevel;
1✔
260
  }
261

262
  /**
263
   * Close the {@link ServerStream} because an internal error occurred. Allow the application to
264
   * run until completion, but silently ignore interactions with the {@link ServerStream} from now
265
   * on.
266
   */
267
  private void handleInternalError(Throwable internalError) {
268
    log.log(Level.WARNING, "Cancelling the stream because of internal error", internalError);
1✔
269
    Status status = (internalError instanceof StatusRuntimeException)
1✔
270
        ? ((StatusRuntimeException) internalError).getStatus()
1✔
271
        : Status.INTERNAL.withCause(internalError)
1✔
272
            .withDescription("Internal error so cancelling stream.");
1✔
273
    stream.cancel(status);
1✔
274
    serverCallTracer.reportCallEnded(false); // error so always false
1✔
275
  }
1✔
276

277
  /**
278
   * All of these callbacks are assumed to called on an application thread, and the caller is
279
   * responsible for handling thrown exceptions.
280
   */
281
  @VisibleForTesting
282
  static final class ServerStreamListenerImpl<ReqT> implements ServerStreamListener {
283
    private final ServerCallImpl<ReqT, ?> call;
284
    private final ServerCall.Listener<ReqT> listener;
285
    private final Context.CancellableContext context;
286

287
    public ServerStreamListenerImpl(
288
        ServerCallImpl<ReqT, ?> call, ServerCall.Listener<ReqT> listener,
289
        Context.CancellableContext context) {
1✔
290
      this.call = checkNotNull(call, "call");
1✔
291
      this.listener = checkNotNull(listener, "listener must not be null");
1✔
292
      this.context = checkNotNull(context, "context");
1✔
293
      // Wire ourselves up so that if the context is cancelled, our flag call.cancelled also
294
      // reflects the new state. Use a DirectExecutor so that it happens in the same thread
295
      // as the caller of {@link Context#cancel}.
296
      this.context.addListener(
1✔
297
          new Context.CancellationListener() {
1✔
298
            @Override
299
            public void cancelled(Context context) {
300
              // If the context has a cancellation cause then something exceptional happened
301
              // and we should also mark the call as cancelled.
302
              if (context.cancellationCause() != null) {
1✔
303
                ServerStreamListenerImpl.this.call.cancelled = true;
1✔
304
              }
305
            }
1✔
306
          },
307
          MoreExecutors.directExecutor());
1✔
308
    }
1✔
309

310
    @Override
311
    public void messagesAvailable(MessageProducer producer) {
312
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) {
1✔
313
        PerfMark.attachTag(call.tag);
1✔
314
        messagesAvailableInternal(producer);
1✔
315
      }
316
    }
1✔
317

318
    @SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try
319
    private void messagesAvailableInternal(final MessageProducer producer) {
320
      if (call.cancelled) {
1✔
321
        GrpcUtil.closeQuietly(producer);
1✔
322
        return;
1✔
323
      }
324

325
      InputStream message;
326
      try {
327
        while ((message = producer.next()) != null) {
1✔
328
          try {
329
            listener.onMessage(call.method.parseRequest(message));
1✔
330
          } catch (Throwable t) {
1✔
331
            GrpcUtil.closeQuietly(message);
1✔
332
            throw t;
1✔
333
          }
1✔
334
          message.close();
1✔
335
        }
336
      } catch (Throwable t) {
1✔
337
        GrpcUtil.closeQuietly(producer);
1✔
338
        Throwables.throwIfUnchecked(t);
×
339
        throw new RuntimeException(t);
×
340
      }
1✔
341
    }
1✔
342

343
    @Override
344
    public void halfClosed() {
345
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) {
1✔
346
        PerfMark.attachTag(call.tag);
1✔
347
        if (call.cancelled) {
1✔
348
          return;
1✔
349
        }
350

351
        listener.onHalfClose();
1✔
352
      }
353
    }
1✔
354

355
    @Override
356
    public void closed(Status status) {
357
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) {
1✔
358
        PerfMark.attachTag(call.tag);
1✔
359
        closedInternal(status);
1✔
360
      }
361
    }
1✔
362

363
    private void closedInternal(Status status) {
364
      Throwable cancelCause = null;
1✔
365
      try {
366
        if (status.isOk()) {
1✔
367
          listener.onComplete();
1✔
368
        } else {
369
          call.cancelled = true;
1✔
370
          listener.onCancel();
1✔
371
          // The status will not have a cause in all failure scenarios but we want to make sure
372
          // we always cancel the context with one to keep the context cancelled state consistent.
373
          cancelCause = InternalStatus.asRuntimeException(
1✔
374
              Status.CANCELLED.withDescription("RPC cancelled"), null, false);
1✔
375
        }
376
      } finally {
377
        // Cancel context after delivering RPC closure notification to allow the application to
378
        // clean up and update any state based on whether onComplete or onCancel was called.
379
        context.cancel(cancelCause);
1✔
380
      }
381
    }
1✔
382

383
    @Override
384
    public void onReady() {
385
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) {
1✔
386
        PerfMark.attachTag(call.tag);
1✔
387
        if (call.cancelled) {
1✔
388
          return;
1✔
389
        }
390
        listener.onReady();
1✔
391
      }
392
    }
1✔
393
  }
394
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc