• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19774

08 Apr 2025 10:30AM UTC coverage: 88.597% (+0.002%) from 88.595%
#19774

push

github

web-flow
stub: trailersFromThrowable() metadata should be copied (#11979) (#12008)

If the same exception is passed to multiple RPCs, then the results will race, which this fix addresses.

Fixes #11973

Co-authored-by: jiangyuan <joe469391363@gmail.com>

34615 of 39070 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.45
/../stub/src/main/java/io/grpc/stub/ServerCalls.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.stub;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Preconditions;
24
import io.grpc.Metadata;
25
import io.grpc.MethodDescriptor;
26
import io.grpc.ServerCall;
27
import io.grpc.ServerCallHandler;
28
import io.grpc.Status;
29

30
/**
31
 * Utility functions for adapting {@link ServerCallHandler}s to application service implementation,
32
 * meant to be used by the generated code.
33
 */
34
public final class ServerCalls {
35

36
  @VisibleForTesting
37
  static final String TOO_MANY_REQUESTS = "Too many requests";
38
  @VisibleForTesting
39
  static final String MISSING_REQUEST = "Half-closed without a request";
40

41
  private ServerCalls() {
42
  }
43

44
  /**
45
   * Creates a {@link ServerCallHandler} for a unary call method of the service.
46
   *
47
   * @param method an adaptor to the actual method on the service implementation.
48
   */
49
  public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall(
50
      UnaryMethod<ReqT, RespT> method) {
51
    return new UnaryServerCallHandler<>(method, false);
1✔
52
  }
53

54
  /**
55
   * Creates a {@link ServerCallHandler} for a server streaming method of the service.
56
   *
57
   * @param method an adaptor to the actual method on the service implementation.
58
   */
59
  public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall(
60
      ServerStreamingMethod<ReqT, RespT> method) {
61
    return new UnaryServerCallHandler<>(method, true);
1✔
62
  }
63

64
  /**
65
   * Creates a {@link ServerCallHandler} for a client streaming method of the service.
66
   *
67
   * @param method an adaptor to the actual method on the service implementation.
68
   */
69
  public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall(
70
      ClientStreamingMethod<ReqT, RespT> method) {
71
    return new StreamingServerCallHandler<>(method, false);
1✔
72
  }
73

74
  /**
75
   * Creates a {@link ServerCallHandler} for a bidi streaming method of the service.
76
   *
77
   * @param method an adaptor to the actual method on the service implementation.
78
   */
79
  public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall(
80
      BidiStreamingMethod<ReqT, RespT> method) {
81
    return new StreamingServerCallHandler<>(method, true);
1✔
82
  }
83

84
  /**
85
   * Adaptor to a unary call method.
86
   */
87
  public interface UnaryMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {
88
    @Override void invoke(ReqT request, StreamObserver<RespT> responseObserver);
89
  }
90

91
  /**
92
   * Adaptor to a server streaming method.
93
   */
94
  public interface ServerStreamingMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {
95
    @Override void invoke(ReqT request, StreamObserver<RespT> responseObserver);
96
  }
97

98
  /**
99
   * Adaptor to a client streaming method.
100
   */
101
  public interface ClientStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {
102
    @Override StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
103
  }
104

105
  /**
106
   * Adaptor to a bidirectional streaming method.
107
   */
108
  public interface BidiStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {
109
    @Override StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
110
  }
111

112
  private static final class UnaryServerCallHandler<ReqT, RespT>
113
      implements ServerCallHandler<ReqT, RespT> {
114

115
    private final UnaryRequestMethod<ReqT, RespT> method;
116
    private final boolean serverStreaming;
117

118
    // Non private to avoid synthetic class
119
    UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method, boolean serverStreaming) {
1✔
120
      this.method = method;
1✔
121
      this.serverStreaming = serverStreaming;
1✔
122
    }
1✔
123

124
    @Override
125
    public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
126
      Preconditions.checkArgument(
1✔
127
          call.getMethodDescriptor().getType().clientSendsOneMessage(),
1✔
128
          "asyncUnaryRequestCall is only for clientSendsOneMessage methods");
129
      ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
1✔
130
          new ServerCallStreamObserverImpl<>(call, serverStreaming);
131
      // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
132
      // sends more than 1 requests, ServerCall will catch it. Note that disabling auto
133
      // inbound flow control has no effect on unary calls.
134
      call.request(2);
1✔
135
      return new UnaryServerCallListener(responseObserver, call);
1✔
136
    }
137

138
    private final class UnaryServerCallListener extends ServerCall.Listener<ReqT> {
139
      private final ServerCall<ReqT, RespT> call;
140
      private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver;
141
      private boolean canInvoke = true;
1✔
142
      private boolean wasReady;
143
      private ReqT request;
144

145
      // Non private to avoid synthetic class
146
      UnaryServerCallListener(
147
          ServerCallStreamObserverImpl<ReqT, RespT> responseObserver,
148
          ServerCall<ReqT, RespT> call) {
1✔
149
        this.call = call;
1✔
150
        this.responseObserver = responseObserver;
1✔
151
      }
1✔
152

153
      @Override
154
      public void onMessage(ReqT request) {
155
        if (this.request != null) {
1✔
156
          // Safe to close the call, because the application has not yet been invoked
157
          call.close(
1✔
158
              Status.INTERNAL.withDescription(TOO_MANY_REQUESTS),
1✔
159
              new Metadata());
160
          canInvoke = false;
1✔
161
          return;
1✔
162
        }
163

164
        // We delay calling method.invoke() until onHalfClose() to make sure the client
165
        // half-closes.
166
        this.request = request;
1✔
167
      }
1✔
168

169
      @Override
170
      public void onHalfClose() {
171
        if (!canInvoke) {
1✔
172
          return;
1✔
173
        }
174
        if (request == null) {
1✔
175
          // Safe to close the call, because the application has not yet been invoked
176
          call.close(
1✔
177
              Status.INTERNAL.withDescription(MISSING_REQUEST),
1✔
178
              new Metadata());
179
          return;
1✔
180
        }
181

182
        method.invoke(request, responseObserver);
1✔
183
        request = null;
1✔
184
        responseObserver.freeze();
1✔
185
        if (wasReady) {
1✔
186
          // Since we are calling invoke in halfClose we have missed the onReady
187
          // event from the transport so recover it here.
188
          onReady();
1✔
189
        }
190
      }
1✔
191

192
      @Override
193
      public void onCancel() {
194
        if (responseObserver.onCancelHandler != null) {
1✔
195
          responseObserver.onCancelHandler.run();
1✔
196
        } else {
197
          // Only trigger exceptions if unable to provide notification via a callback
198
          responseObserver.cancelled = true;
1✔
199
        }
200
      }
1✔
201

202
      @Override
203
      public void onReady() {
204
        wasReady = true;
1✔
205
        if (responseObserver.onReadyHandler != null) {
1✔
206
          responseObserver.onReadyHandler.run();
1✔
207
        }
208
      }
1✔
209

210
      @Override
211
      public void onComplete() {
212
        if (responseObserver.onCloseHandler != null) {
1✔
213
          responseObserver.onCloseHandler.run();
1✔
214
        }
215
      }
1✔
216
    }
217
  }
218

219
  private static final class StreamingServerCallHandler<ReqT, RespT>
220
      implements ServerCallHandler<ReqT, RespT> {
221

222
    private final StreamingRequestMethod<ReqT, RespT> method;
223
    private final boolean bidi;
224

225
    // Non private to avoid synthetic class
226
    StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method, boolean bidi) {
1✔
227
      this.method = method;
1✔
228
      this.bidi = bidi;
1✔
229
    }
1✔
230

231
    @Override
232
    public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
233
      ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
1✔
234
          new ServerCallStreamObserverImpl<>(call, bidi);
235
      StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
1✔
236
      responseObserver.freeze();
1✔
237
      if (responseObserver.autoRequestEnabled) {
1✔
238
        call.request(1);
1✔
239
      }
240
      return new StreamingServerCallListener(requestObserver, responseObserver, call);
1✔
241
    }
242

243
    private final class StreamingServerCallListener extends ServerCall.Listener<ReqT> {
244

245
      private final StreamObserver<ReqT> requestObserver;
246
      private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver;
247
      private final ServerCall<ReqT, RespT> call;
248
      private boolean halfClosed = false;
1✔
249

250
      // Non private to avoid synthetic class
251
      StreamingServerCallListener(
252
          StreamObserver<ReqT> requestObserver,
253
          ServerCallStreamObserverImpl<ReqT, RespT> responseObserver,
254
          ServerCall<ReqT, RespT> call) {
1✔
255
        this.requestObserver = requestObserver;
1✔
256
        this.responseObserver = responseObserver;
1✔
257
        this.call = call;
1✔
258
      }
1✔
259

260
      @Override
261
      public void onMessage(ReqT request) {
262
        requestObserver.onNext(request);
1✔
263

264
        // Request delivery of the next inbound message.
265
        if (responseObserver.autoRequestEnabled) {
1✔
266
          call.request(1);
1✔
267
        }
268
      }
1✔
269

270
      @Override
271
      public void onHalfClose() {
272
        halfClosed = true;
1✔
273
        requestObserver.onCompleted();
1✔
274
      }
1✔
275

276
      @Override
277
      public void onCancel() {
278
        if (responseObserver.onCancelHandler != null) {
1✔
279
          responseObserver.onCancelHandler.run();
1✔
280
        } else {
281
          // Only trigger exceptions if unable to provide notification via a callback. Even though
282
          // onError would provide notification to the server, we still throw an error since there
283
          // isn't a guaranteed callback available. If the cancellation happened in a different
284
          // order the service could be surprised to see the exception.
285
          responseObserver.cancelled = true;
1✔
286
        }
287
        if (!halfClosed) {
1✔
288
          requestObserver.onError(
1✔
289
              Status.CANCELLED
290
                  .withDescription("client cancelled")
1✔
291
                  .asRuntimeException());
1✔
292
        }
293
      }
1✔
294

295
      @Override
296
      public void onReady() {
297
        if (responseObserver.onReadyHandler != null) {
1✔
298
          responseObserver.onReadyHandler.run();
1✔
299
        }
300
      }
1✔
301

302
      @Override
303
      public void onComplete() {
304
        if (responseObserver.onCloseHandler != null) {
1✔
305
          responseObserver.onCloseHandler.run();
1✔
306
        }
307
      }
1✔
308
    }
309
  }
310

311
  private interface UnaryRequestMethod<ReqT, RespT> {
312
    /**
313
     * The provided {@code responseObserver} will extend {@link ServerCallStreamObserver}.
314
     */
315
    void invoke(ReqT request, StreamObserver<RespT> responseObserver);
316
  }
317

318
  private interface StreamingRequestMethod<ReqT, RespT> {
319
    /**
320
     * The provided {@code responseObserver} will extend {@link ServerCallStreamObserver}.
321
     */
322
    StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
323
  }
324

325
  private static final class ServerCallStreamObserverImpl<ReqT, RespT>
326
      extends ServerCallStreamObserver<RespT> {
327
    final ServerCall<ReqT, RespT> call;
328
    private final boolean serverStreamingOrBidi;
329
    volatile boolean cancelled;
330
    private boolean frozen;
331
    private boolean autoRequestEnabled = true;
1✔
332
    private boolean sentHeaders;
333
    private Runnable onReadyHandler;
334
    private Runnable onCancelHandler;
335
    private boolean aborted = false;
1✔
336
    private boolean completed = false;
1✔
337
    private Runnable onCloseHandler;
338

339
    // Non private to avoid synthetic class
340
    ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi) {
1✔
341
      this.call = call;
1✔
342
      this.serverStreamingOrBidi = serverStreamingOrBidi;
1✔
343
    }
1✔
344

345
    private void freeze() {
346
      this.frozen = true;
1✔
347
    }
1✔
348

349
    @Override
350
    public void setMessageCompression(boolean enable) {
351
      call.setMessageCompression(enable);
×
352
    }
×
353

354
    @Override
355
    public void setCompression(String compression) {
356
      call.setCompression(compression);
1✔
357
    }
1✔
358

359
    @Override
360
    public void onNext(RespT response) {
361
      if (cancelled) {
1✔
362
        if (serverStreamingOrBidi) {
1✔
363
          throw Status.CANCELLED
1✔
364
              .withDescription("call already cancelled. "
1✔
365
                  + "Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception")
366
              .asRuntimeException();
1✔
367
        } else {
368
          // We choose not to throw for unary responses. The exception is intended to stop servers
369
          // from continuing processing, but for unary responses there is no further processing
370
          // so throwing an exception would not provide a benefit and would increase application
371
          // complexity.
372
        }
373
      }
374
      checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
1✔
375
      checkState(!completed, "Stream is already completed, no further calls are allowed");
1✔
376
      if (!sentHeaders) {
1✔
377
        call.sendHeaders(new Metadata());
1✔
378
        sentHeaders = true;
1✔
379
      }
380
      call.sendMessage(response);
1✔
381
    }
1✔
382

383
    @Override
384
    public void onError(Throwable t) {
385
      Metadata metadata = new Metadata();
1✔
386
      Metadata trailers = Status.trailersFromThrowable(t);
1✔
387
      if (trailers != null) {
1✔
388
        metadata.merge(trailers);
1✔
389
      }
390
      call.close(Status.fromThrowable(t), metadata);
1✔
391
      aborted = true;
1✔
392
    }
1✔
393

394
    @Override
395
    public void onCompleted() {
396
      call.close(Status.OK, new Metadata());
1✔
397
      completed = true;
1✔
398
    }
1✔
399
    
400
    @Override
401
    public boolean isReady() {
402
      return call.isReady();
1✔
403
    }
404

405
    @Override
406
    public void setOnReadyHandler(Runnable r) {
407
      checkState(!frozen, "Cannot alter onReadyHandler after initialization. May only be called "
1✔
408
          + "during the initial call to the application, before the service returns its "
409
          + "StreamObserver");
410
      this.onReadyHandler = r;
1✔
411
    }
1✔
412

413
    @Override
414
    public boolean isCancelled() {
415
      return call.isCancelled();
1✔
416
    }
417

418
    @Override
419
    public void setOnCancelHandler(Runnable onCancelHandler) {
420
      checkState(!frozen, "Cannot alter onCancelHandler after initialization. May only be called "
1✔
421
          + "during the initial call to the application, before the service returns its "
422
          + "StreamObserver");
423
      this.onCancelHandler = onCancelHandler;
1✔
424
    }
1✔
425

426
    @Override
427
    public void setOnReadyThreshold(int numBytes) {
428
      checkState(!frozen, "Cannot alter setOnReadyThreshold after initialization. May only be "
1✔
429
          + "called during the initial call to the application, before the service returns its "
430
          + "StreamObserver");
431
      call.setOnReadyThreshold(numBytes);
1✔
432
    }
1✔
433

434
    @Override
435
    public void disableAutoInboundFlowControl() {
436
      disableAutoRequest();
×
437
    }
×
438

439
    @Override
440
    public void disableAutoRequest() {
441
      checkState(!frozen, "Cannot disable auto flow control after initialization");
1✔
442
      autoRequestEnabled = false;
1✔
443
    }
1✔
444

445
    @Override
446
    public void request(int count) {
447
      call.request(count);
1✔
448
    }
1✔
449

450
    @Override
451
    public void setOnCloseHandler(Runnable onCloseHandler) {
452
      checkState(!frozen, "Cannot alter onCloseHandler after initialization. May only be called "
1✔
453
          + "during the initial call to the application, before the service returns its "
454
          + "StreamObserver");
455
      this.onCloseHandler = onCloseHandler;
1✔
456
    }
1✔
457
  }
458

459
  /**
460
   * Sets unimplemented status for method on given response stream for unary call.
461
   *
462
   * @param methodDescriptor of method for which error will be thrown.
463
   * @param responseObserver on which error will be set.
464
   */
465
  public static void asyncUnimplementedUnaryCall(
466
      MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
467
    checkNotNull(methodDescriptor, "methodDescriptor");
1✔
468
    checkNotNull(responseObserver, "responseObserver");
1✔
469
    responseObserver.onError(Status.UNIMPLEMENTED
1✔
470
        .withDescription(String.format("Method %s is unimplemented",
1✔
471
            methodDescriptor.getFullMethodName()))
1✔
472
        .asRuntimeException());
1✔
473
  }
1✔
474

475
  /**
476
   * Sets unimplemented status for streaming call.
477
   *
478
   * @param methodDescriptor of method for which error will be thrown.
479
   * @param responseObserver on which error will be set.
480
   */
481
  public static <ReqT> StreamObserver<ReqT> asyncUnimplementedStreamingCall(
482
      MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
483
    // NB: For streaming call we want to do the same as for unary call. Fail-fast by setting error
484
    // on responseObserver and then return no-op observer.
485
    asyncUnimplementedUnaryCall(methodDescriptor, responseObserver);
×
486
    return new NoopStreamObserver<>();
×
487
  }
488

489
  /**
490
   * No-op implementation of StreamObserver. Used in abstract stubs for default implementations of
491
   * methods which throws UNIMPLEMENTED error and tests.
492
   */
493
  static class NoopStreamObserver<V> implements StreamObserver<V> {
1✔
494
    @Override
495
    public void onNext(V value) {
496
    }
1✔
497

498
    @Override
499
    public void onError(Throwable t) {
500
    }
1✔
501

502
    @Override
503
    public void onCompleted() {
504
    }
×
505
  }
506
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc