• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19995

22 Sep 2025 09:51AM UTC coverage: 88.543% (+0.006%) from 88.537%
#19995

push

github

web-flow
stub: remove interrupt checking from ThreadSafeThreadlessExecutor.drain (#12358)

The only caller of `drain()` just reset the interrupt flag. So, we can
avoid the whole exception dance.

34661 of 39146 relevant lines covered (88.54%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.34
/../stub/src/main/java/io/grpc/stub/ClientCalls.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.stub;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Predicate;
26
import com.google.common.base.Strings;
27
import com.google.common.util.concurrent.AbstractFuture;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import io.grpc.CallOptions;
30
import io.grpc.Channel;
31
import io.grpc.ClientCall;
32
import io.grpc.ExperimentalApi;
33
import io.grpc.Metadata;
34
import io.grpc.MethodDescriptor;
35
import io.grpc.Status;
36
import io.grpc.StatusException;
37
import io.grpc.StatusRuntimeException;
38
import java.util.Iterator;
39
import java.util.NoSuchElementException;
40
import java.util.concurrent.ArrayBlockingQueue;
41
import java.util.concurrent.BlockingQueue;
42
import java.util.concurrent.ConcurrentLinkedQueue;
43
import java.util.concurrent.ExecutionException;
44
import java.util.concurrent.Executor;
45
import java.util.concurrent.Future;
46
import java.util.concurrent.RejectedExecutionException;
47
import java.util.concurrent.TimeoutException;
48
import java.util.concurrent.locks.Condition;
49
import java.util.concurrent.locks.Lock;
50
import java.util.concurrent.locks.LockSupport;
51
import java.util.concurrent.locks.ReentrantLock;
52
import java.util.logging.Level;
53
import java.util.logging.Logger;
54
import javax.annotation.Nonnull;
55
import javax.annotation.Nullable;
56

57
/**
58
 * Utility functions for processing different call idioms. We have one-to-one correspondence
59
 * between utilities in this class and the potential signatures in a generated stub class so
60
 * that the runtime can vary behavior without requiring regeneration of the stub.
61
 */
62
public final class ClientCalls {
63

64
  private static final Logger logger = Logger.getLogger(ClientCalls.class.getName());
1✔
65

66
  @VisibleForTesting
67
  static boolean rejectRunnableOnExecutor =
1✔
68
      !Strings.isNullOrEmpty(System.getenv("GRPC_CLIENT_CALL_REJECT_RUNNABLE"))
1✔
69
          && Boolean.parseBoolean(System.getenv("GRPC_CLIENT_CALL_REJECT_RUNNABLE"));
1✔
70

71
  // Prevent instantiation
72
  private ClientCalls() {}
73

74
  /**
75
   * Executes a unary call with a response {@link StreamObserver}.  The {@code call} should not be
76
   * already started.  After calling this method, {@code call} should no longer be used.
77
   *
78
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
79
   * {@code beforeStart()} will be called.
80
   */
81
  public static <ReqT, RespT> void asyncUnaryCall(
82
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
83
    checkNotNull(responseObserver, "responseObserver");
1✔
84
    asyncUnaryRequestCall(call, req, responseObserver, false);
1✔
85
  }
1✔
86

87
  /**
88
   * Executes a server-streaming call with a response {@link StreamObserver}.  The {@code call}
89
   * should not be already started.  After calling this method, {@code call} should no longer be
90
   * used.
91
   *
92
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
93
   * {@code beforeStart()} will be called.
94
   */
95
  public static <ReqT, RespT> void asyncServerStreamingCall(
96
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
97
    checkNotNull(responseObserver, "responseObserver");
1✔
98
    asyncUnaryRequestCall(call, req, responseObserver, true);
1✔
99
  }
1✔
100

101
  /**
102
   * Executes a client-streaming call returning a {@link StreamObserver} for the request messages.
103
   * The {@code call} should not be already started.  After calling this method, {@code call}
104
   * should no longer be used.
105
   *
106
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
107
   * {@code beforeStart()} will be called.
108
   *
109
   * @return request stream observer. It will extend {@link ClientCallStreamObserver}
110
   */
111
  public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall(
112
      ClientCall<ReqT, RespT> call,
113
      StreamObserver<RespT> responseObserver) {
114
    checkNotNull(responseObserver, "responseObserver");
1✔
115
    return asyncStreamingRequestCall(call, responseObserver, false);
1✔
116
  }
117

118
  /**
119
   * Executes a bidirectional-streaming call.  The {@code call} should not be already started.
120
   * After calling this method, {@code call} should no longer be used.
121
   *
122
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
123
   * {@code beforeStart()} will be called.
124
   *
125
   * @return request stream observer. It will extend {@link ClientCallStreamObserver}
126
   */
127
  public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall(
128
      ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
129
    checkNotNull(responseObserver, "responseObserver");
1✔
130
    return asyncStreamingRequestCall(call, responseObserver, true);
1✔
131
  }
132

133
  /**
134
   * Executes a unary call and blocks on the response.  The {@code call} should not be already
135
   * started.  After calling this method, {@code call} should no longer be used.
136
   *
137
   * @return the single response message.
138
   * @throws StatusRuntimeException on error
139
   */
140
  public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
141
    try {
142
      return getUnchecked(futureUnaryCall(call, req));
1✔
143
    } catch (RuntimeException | Error e) {
1✔
144
      throw cancelThrow(call, e);
×
145
    }
146
  }
147

148
  /**
149
   * Executes a unary call and blocks on the response.  The {@code call} should not be already
150
   * started.  After calling this method, {@code call} should no longer be used.
151
   *
152
   * @return the single response message.
153
   * @throws StatusRuntimeException on error
154
   */
155
  public static <ReqT, RespT> RespT blockingUnaryCall(
156
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
157
    ThreadlessExecutor executor = new ThreadlessExecutor();
1✔
158
    boolean interrupt = false;
1✔
159
    ClientCall<ReqT, RespT> call = channel.newCall(method,
1✔
160
        callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
1✔
161
            .withExecutor(executor));
1✔
162
    try {
163
      ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
1✔
164
      while (!responseFuture.isDone()) {
1✔
165
        try {
166
          executor.waitAndDrain();
1✔
167
        } catch (InterruptedException e) {
1✔
168
          interrupt = true;
1✔
169
          call.cancel("Thread interrupted", e);
1✔
170
          // Now wait for onClose() to be called, so interceptors can clean up
171
        }
1✔
172
      }
173
      executor.shutdown();
1✔
174
      return getUnchecked(responseFuture);
1✔
175
    } catch (RuntimeException | Error e) {
1✔
176
      // Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
177
      throw cancelThrow(call, e);
×
178
    } finally {
179
      if (interrupt) {
1✔
180
        Thread.currentThread().interrupt();
1✔
181
      }
182
    }
183
  }
184

185
  /**
186
   * Executes a unary call and blocks on the response,
187
   * throws a checked {@link StatusException}.
188
   *
189
   * @return the single response message.
190
   * @throws StatusException on error
191
   */
192
  public static <ReqT, RespT> RespT blockingV2UnaryCall(
193
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req)
194
      throws StatusException {
195
    try {
196
      return blockingUnaryCall(channel, method, callOptions, req);
×
197
    } catch (StatusRuntimeException e) {
×
198
      throw e.getStatus().asException(e.getTrailers());
×
199
    }
200
  }
201

202
  /**
203
   * Executes a server-streaming call returning a blocking {@link Iterator} over the
204
   * response stream.  The {@code call} should not be already started.  After calling this method,
205
   * {@code call} should no longer be used.
206
   *
207
   * <p>The returned iterator may throw {@link StatusRuntimeException} on error.
208
   *
209
   * @return an iterator over the response stream.
210
   */
211
  public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
212
      ClientCall<ReqT, RespT> call, ReqT req) {
213
    BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
1✔
214
    asyncUnaryRequestCall(call, req, result.listener());
1✔
215
    return result;
1✔
216
  }
217

218
  /**
219
   * Executes a server-streaming call returning a blocking {@link Iterator} over the
220
   * response stream.
221
   *
222
   * <p>The returned iterator may throw {@link StatusRuntimeException} on error.
223
   *
224
   * <p>Warning:  the iterator can result in leaks if not completely consumed.
225
   *
226
   * @return an iterator over the response stream.
227
   */
228
  public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
229
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
230
    ClientCall<ReqT, RespT> call = channel.newCall(method,
1✔
231
        callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING));
1✔
232

233
    BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
1✔
234
    asyncUnaryRequestCall(call, req, result.listener());
1✔
235
    return result;
1✔
236
  }
237

238
  /**
239
   * Initiates a client streaming call over the specified channel.  It returns an
240
   * object which can be used in a blocking manner to retrieve responses..
241
   *
242
   * <p>The methods {@link BlockingClientCall#hasNext()} and {@link
243
   * BlockingClientCall#cancel(String, Throwable)} can be used for more extensive control.
244
   *
245
   * @return A {@link BlockingClientCall} that has had the request sent and halfClose called
246
   */
247
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
248
  public static <ReqT, RespT> BlockingClientCall<ReqT, RespT> blockingV2ServerStreamingCall(
249
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
250
    BlockingClientCall<ReqT, RespT> call =
×
251
        blockingBidiStreamingCall(channel, method, callOptions);
×
252

253
    call.sendSingleRequest(req);
×
254
    call.halfClose();
×
255
    return call;
×
256
  }
257

258
  /**
259
   * Initiates a server streaming call and sends the specified request to the server.  It returns an
260
   * object which can be used in a blocking manner to retrieve values from the server.  After the
261
   * last value has been read, the next read call will return null.
262
   *
263
   * <p>Call {@link BlockingClientCall#read()} for
264
   * retrieving values.  A {@code null} will be returned after the server has closed the stream.
265
   *
266
   * <p>The methods {@link BlockingClientCall#hasNext()} and {@link
267
   * BlockingClientCall#cancel(String, Throwable)} can be used for more extensive control.
268
   *
269
   * <p><br> Example usage:
270
   * <pre> {@code  while ((response = call.read()) != null) { ... } } </pre>
271
   * or
272
   * <pre> {@code
273
   *   while (call.hasNext()) {
274
   *     response = call.read();
275
   *     ...
276
   *   }
277
   * } </pre>
278
   *
279
   * <p>Note that this paradigm is different from the original
280
   * {@link #blockingServerStreamingCall(Channel, MethodDescriptor, CallOptions, Object)}
281
   * which returns an iterator, which would leave the stream open if not completely consumed.
282
   *
283
   * @return A {@link BlockingClientCall} which can be used by the client to write and receive
284
   *     messages over the grpc channel.
285
   */
286
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
287
  public static <ReqT, RespT> BlockingClientCall<ReqT, RespT> blockingClientStreamingCall(
288
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
289
    return blockingBidiStreamingCall(channel, method, callOptions);
×
290
  }
291

292
  /**
293
   * Initiate a bidirectional-streaming {@link ClientCall} and returning a stream object
294
   * ({@link BlockingClientCall}) which can be used by the client to send and receive messages over
295
   * the grpc channel.
296
   *
297
   * @return an object representing the call which can be used to read, write and terminate it.
298
   */
299
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
300
  public static <ReqT, RespT> BlockingClientCall<ReqT, RespT> blockingBidiStreamingCall(
301
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
302
    ThreadSafeThreadlessExecutor executor = new ThreadSafeThreadlessExecutor();
1✔
303
    ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
1✔
304

305
    BlockingClientCall<ReqT, RespT> blockingClientCall = new BlockingClientCall<>(call, executor);
1✔
306

307
    // Get the call started
308
    call.start(blockingClientCall.getListener(), new Metadata());
1✔
309
    call.request(1);
1✔
310

311
    return blockingClientCall;
1✔
312
  }
313

314
  /**
315
   * Executes a unary call and returns a {@link ListenableFuture} to the response.  The
316
   * {@code call} should not be already started.  After calling this method, {@code call} should no
317
   * longer be used.
318
   *
319
   * @return a future for the single response message.
320
   */
321
  public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
322
      ClientCall<ReqT, RespT> call, ReqT req) {
323
    GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call);
1✔
324
    asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture));
1✔
325
    return responseFuture;
1✔
326
  }
327

328
  /**
329
   * Returns the result of calling {@link Future#get()} interruptibly on a task known not to throw a
330
   * checked exception.
331
   *
332
   * <p>If interrupted, the interrupt is restored before throwing an exception..
333
   *
334
   * @throws java.util.concurrent.CancellationException
335
   *     if {@code get} throws a {@code CancellationException}.
336
   * @throws io.grpc.StatusRuntimeException if {@code get} throws an {@link ExecutionException}
337
   *     or an {@link InterruptedException}.
338
   */
339
  private static <V> V getUnchecked(Future<V> future) {
340
    try {
341
      return future.get();
1✔
342
    } catch (InterruptedException e) {
×
343
      Thread.currentThread().interrupt();
×
344
      throw Status.CANCELLED
×
345
          .withDescription("Thread interrupted")
×
346
          .withCause(e)
×
347
          .asRuntimeException();
×
348
    } catch (ExecutionException e) {
1✔
349
      throw toStatusRuntimeException(e.getCause());
1✔
350
    }
351
  }
352

353
  /**
354
   * Wraps the given {@link Throwable} in a {@link StatusRuntimeException}. If it contains an
355
   * embedded {@link StatusException} or {@link StatusRuntimeException}, the returned exception will
356
   * contain the embedded trailers and status, with the given exception as the cause. Otherwise, an
357
   * exception will be generated from an {@link Status#UNKNOWN} status.
358
   */
359
  private static StatusRuntimeException toStatusRuntimeException(Throwable t) {
360
    Throwable cause = checkNotNull(t, "t");
1✔
361
    while (cause != null) {
1✔
362
      // If we have an embedded status, use it and replace the cause
363
      if (cause instanceof StatusException) {
1✔
364
        StatusException se = (StatusException) cause;
×
365
        return new StatusRuntimeException(se.getStatus(), se.getTrailers());
×
366
      } else if (cause instanceof StatusRuntimeException) {
1✔
367
        StatusRuntimeException se = (StatusRuntimeException) cause;
1✔
368
        return new StatusRuntimeException(se.getStatus(), se.getTrailers());
1✔
369
      }
370
      cause = cause.getCause();
×
371
    }
372
    return Status.UNKNOWN.withDescription("unexpected exception").withCause(t)
×
373
        .asRuntimeException();
×
374
  }
375

376
  /**
377
   * Cancels a call, and throws the exception.
378
   *
379
   * @param t must be a RuntimeException or Error
380
   */
381
  private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
382
    try {
383
      call.cancel(null, t);
1✔
384
    } catch (RuntimeException | Error e) {
×
385
      logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
×
386
    }
1✔
387
    if (t instanceof RuntimeException) {
1✔
388
      throw (RuntimeException) t;
1✔
389
    } else if (t instanceof Error) {
×
390
      throw (Error) t;
×
391
    }
392
    // should be impossible
393
    throw new AssertionError(t);
×
394
  }
395

396
  private static <ReqT, RespT> void asyncUnaryRequestCall(
397
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver,
398
      boolean streamingResponse) {
399
    asyncUnaryRequestCall(
1✔
400
        call,
401
        req,
402
        new StreamObserverToCallListenerAdapter<>(
403
            responseObserver,
404
            new CallToStreamObserverAdapter<>(call, streamingResponse)));
405
  }
1✔
406

407
  private static <ReqT, RespT> void asyncUnaryRequestCall(
408
      ClientCall<ReqT, RespT> call,
409
      ReqT req,
410
      StartableListener<RespT> responseListener) {
411
    startCall(call, responseListener);
1✔
412
    try {
413
      call.sendMessage(req);
1✔
414
      call.halfClose();
1✔
415
    } catch (RuntimeException | Error e) {
×
416
      throw cancelThrow(call, e);
×
417
    }
1✔
418
  }
1✔
419

420
  private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
421
      ClientCall<ReqT, RespT> call,
422
      StreamObserver<RespT> responseObserver,
423
      boolean streamingResponse) {
424
    CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<>(
1✔
425
        call, streamingResponse);
426
    startCall(
1✔
427
        call,
428
        new StreamObserverToCallListenerAdapter<>(responseObserver, adapter));
429
    return adapter;
1✔
430
  }
431

432
  private static <ReqT, RespT> void startCall(
433
      ClientCall<ReqT, RespT> call,
434
      StartableListener<RespT> responseListener) {
435
    call.start(responseListener, new Metadata());
1✔
436
    responseListener.onStart();
1✔
437
  }
1✔
438

439
  private abstract static class StartableListener<T> extends ClientCall.Listener<T> {
440
    abstract void onStart();
441
  }
442

443
  private static final class CallToStreamObserverAdapter<ReqT>
444
      extends ClientCallStreamObserver<ReqT> {
445
    private boolean frozen;
446
    private final ClientCall<ReqT, ?> call;
447
    private final boolean streamingResponse;
448
    private Runnable onReadyHandler;
449
    private int initialRequest = 1;
1✔
450
    private boolean autoRequestEnabled = true;
1✔
451
    private boolean aborted = false;
1✔
452
    private boolean completed = false;
1✔
453

454
    // Non private to avoid synthetic class
455
    CallToStreamObserverAdapter(ClientCall<ReqT, ?> call, boolean streamingResponse) {
1✔
456
      this.call = call;
1✔
457
      this.streamingResponse = streamingResponse;
1✔
458
    }
1✔
459

460
    private void freeze() {
461
      this.frozen = true;
1✔
462
    }
1✔
463

464
    @Override
465
    public void onNext(ReqT value) {
466
      checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
1✔
467
      checkState(!completed, "Stream is already completed, no further calls are allowed");
1✔
468
      call.sendMessage(value);
1✔
469
    }
1✔
470

471
    @Override
472
    public void onError(Throwable t) {
473
      call.cancel("Cancelled by client with StreamObserver.onError()", t);
1✔
474
      aborted = true;
1✔
475
    }
1✔
476

477
    @Override
478
    public void onCompleted() {
479
      call.halfClose();
1✔
480
      completed = true;
1✔
481
    }
1✔
482

483
    @Override
484
    public boolean isReady() {
485
      return call.isReady();
1✔
486
    }
487

488
    @Override
489
    public void setOnReadyHandler(Runnable onReadyHandler) {
490
      if (frozen) {
1✔
491
        throw new IllegalStateException(
1✔
492
            "Cannot alter onReadyHandler after call started. Use ClientResponseObserver");
493
      }
494
      this.onReadyHandler = onReadyHandler;
1✔
495
    }
1✔
496

497
    @Override
498
    public void disableAutoInboundFlowControl() {
499
      disableAutoRequestWithInitial(1);
×
500
    }
×
501

502
    @Override
503
    public void disableAutoRequestWithInitial(int request) {
504
      if (frozen) {
1✔
505
        throw new IllegalStateException(
×
506
            "Cannot disable auto flow control after call started. Use ClientResponseObserver");
507
      }
508
      Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
1✔
509
      initialRequest = request;
1✔
510
      autoRequestEnabled = false;
1✔
511
    }
1✔
512

513
    @Override
514
    public void request(int count) {
515
      if (!streamingResponse && count == 1) {
1✔
516
        // Initially ask for two responses from flow-control so that if a misbehaving server
517
        // sends more than one response, we can catch it and fail it in the listener.
518
        call.request(2);
1✔
519
      } else {
520
        call.request(count);
1✔
521
      }
522
    }
1✔
523

524
    @Override
525
    public void setMessageCompression(boolean enable) {
526
      call.setMessageCompression(enable);
×
527
    }
×
528

529
    @Override
530
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
531
      call.cancel(message, cause);
×
532
    }
×
533
  }
534

535
  private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
536
      extends StartableListener<RespT> {
537
    private final StreamObserver<RespT> observer;
538
    private final CallToStreamObserverAdapter<ReqT> adapter;
539
    private boolean firstResponseReceived;
540

541
    // Non private to avoid synthetic class
542
    StreamObserverToCallListenerAdapter(
543
        StreamObserver<RespT> observer,
544
        CallToStreamObserverAdapter<ReqT> adapter) {
1✔
545
      this.observer = observer;
1✔
546
      this.adapter = adapter;
1✔
547
      if (observer instanceof ClientResponseObserver) {
1✔
548
        @SuppressWarnings("unchecked")
549
        ClientResponseObserver<ReqT, RespT> clientResponseObserver =
1✔
550
            (ClientResponseObserver<ReqT, RespT>) observer;
551
        clientResponseObserver.beforeStart(adapter);
1✔
552
      }
553
      adapter.freeze();
1✔
554
    }
1✔
555

556
    @Override
557
    public void onHeaders(Metadata headers) {
558
    }
1✔
559

560
    @Override
561
    public void onMessage(RespT message) {
562
      if (firstResponseReceived && !adapter.streamingResponse) {
1✔
563
        throw Status.INTERNAL
×
564
            .withDescription("More than one responses received for unary or client-streaming call")
×
565
            .asRuntimeException();
×
566
      }
567
      firstResponseReceived = true;
1✔
568
      observer.onNext(message);
1✔
569

570
      if (adapter.streamingResponse && adapter.autoRequestEnabled) {
1✔
571
        // Request delivery of the next inbound message.
572
        adapter.request(1);
1✔
573
      }
574
    }
1✔
575

576
    @Override
577
    public void onClose(Status status, Metadata trailers) {
578
      if (status.isOk()) {
1✔
579
        observer.onCompleted();
1✔
580
      } else {
581
        observer.onError(status.asRuntimeException(trailers));
1✔
582
      }
583
    }
1✔
584

585
    @Override
586
    public void onReady() {
587
      if (adapter.onReadyHandler != null) {
1✔
588
        adapter.onReadyHandler.run();
1✔
589
      }
590
    }
1✔
591

592
    @Override
593
    void onStart() {
594
      if (adapter.initialRequest > 0) {
1✔
595
        adapter.request(adapter.initialRequest);
1✔
596
      }
597
    }
1✔
598
  }
599

600
  /**
601
   * Completes a {@link GrpcFuture} using {@link StreamObserver} events.
602
   */
603
  private static final class UnaryStreamToFuture<RespT> extends StartableListener<RespT> {
604
    private final GrpcFuture<RespT> responseFuture;
605
    private RespT value;
606
    private boolean isValueReceived = false;
1✔
607

608
    // Non private to avoid synthetic class
609
    UnaryStreamToFuture(GrpcFuture<RespT> responseFuture) {
1✔
610
      this.responseFuture = responseFuture;
1✔
611
    }
1✔
612

613
    @Override
614
    public void onHeaders(Metadata headers) {
615
    }
1✔
616

617
    @Override
618
    public void onMessage(RespT value) {
619
      if (this.isValueReceived) {
1✔
620
        throw Status.INTERNAL.withDescription("More than one value received for unary call")
×
621
            .asRuntimeException();
×
622
      }
623
      this.value = value;
1✔
624
      this.isValueReceived = true;
1✔
625
    }
1✔
626

627
    @Override
628
    public void onClose(Status status, Metadata trailers) {
629
      if (status.isOk()) {
1✔
630
        if (!isValueReceived) {
1✔
631
          // No value received so mark the future as an error
632
          responseFuture.setException(
×
633
              Status.INTERNAL.withDescription("No value received for unary call")
×
634
                  .asRuntimeException(trailers));
×
635
        }
636
        responseFuture.set(value);
1✔
637
      } else {
638
        responseFuture.setException(status.asRuntimeException(trailers));
1✔
639
      }
640
    }
1✔
641

642
    @Override
643
    void onStart() {
644
      responseFuture.call.request(2);
1✔
645
    }
1✔
646
  }
647

648
  private static final class GrpcFuture<RespT> extends AbstractFuture<RespT> {
649
    private final ClientCall<?, RespT> call;
650

651
    // Non private to avoid synthetic class
652
    GrpcFuture(ClientCall<?, RespT> call) {
1✔
653
      this.call = call;
1✔
654
    }
1✔
655

656
    @Override
657
    protected void interruptTask() {
658
      call.cancel("GrpcFuture was cancelled", null);
1✔
659
    }
1✔
660

661
    @Override
662
    protected boolean set(@Nullable RespT resp) {
663
      return super.set(resp);
1✔
664
    }
665

666
    @Override
667
    protected boolean setException(Throwable throwable) {
668
      return super.setException(throwable);
1✔
669
    }
670

671
    @SuppressWarnings("MissingOverride") // Add @Override once Java 6 support is dropped
672
    protected String pendingToString() {
673
      return MoreObjects.toStringHelper(this).add("clientCall", call).toString();
×
674
    }
675
  }
676

677
  /**
678
   * Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking {@link Iterator}.
679
   *
680
   * <p>The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a
681
   * separate thread from {@link Iterator} calls.
682
   */
683
  // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
684
  private static final class BlockingResponseStream<T> implements Iterator<T> {
685
    // Due to flow control, only needs to hold up to 3 items: 2 for value, 1 for close.
686
    // (2 for value, not 1, because of early request() in next())
687
    private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
1✔
688
    private final StartableListener<T> listener = new QueuingListener();
1✔
689
    private final ClientCall<?, T> call;
690
    // Only accessed when iterating.
691
    private Object last;
692

693
    // Non private to avoid synthetic class
694
    BlockingResponseStream(ClientCall<?, T> call) {
1✔
695
      this.call = call;
1✔
696
    }
1✔
697

698
    StartableListener<T> listener() {
699
      return listener;
1✔
700
    }
701

702
    private Object waitForNext() {
703
      boolean interrupt = false;
1✔
704
      try {
705
        while (true) {
706
          try {
707
            return buffer.take();
1✔
708
          } catch (InterruptedException ie) {
1✔
709
            interrupt = true;
1✔
710
            call.cancel("Thread interrupted", ie);
1✔
711
            // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
712
          }
1✔
713
        }
714
      } finally {
715
        if (interrupt) {
1✔
716
          Thread.currentThread().interrupt();
1✔
717
        }
718
      }
719
    }
720

721
    @Override
722
    public boolean hasNext() {
723
      while (last == null) {
1✔
724
        // Will block here indefinitely waiting for content. RPC timeouts defend against permanent
725
        // hangs here as the call will become closed.
726
        last = waitForNext();
1✔
727
      }
728
      if (last instanceof StatusRuntimeException) {
1✔
729
        // Rethrow the exception with a new stacktrace.
730
        StatusRuntimeException e = (StatusRuntimeException) last;
1✔
731
        throw e.getStatus().asRuntimeException(e.getTrailers());
1✔
732
      }
733
      return last != this;
1✔
734
    }
735

736
    @Override
737
    public T next() {
738
      // Eagerly call request(1) so it can be processing the next message while we wait for the
739
      // current one, which reduces latency for the next message. With MigratingThreadDeframer and
740
      // if the data has already been received, every other message can be delivered instantly. This
741
      // can be run after hasNext(), but just would be slower.
742
      if (!(last instanceof StatusRuntimeException) && last != this) {
1✔
743
        call.request(1);
1✔
744
      }
745
      if (!hasNext()) {
1✔
746
        throw new NoSuchElementException();
×
747
      }
748
      @SuppressWarnings("unchecked")
749
      T tmp = (T) last;
1✔
750
      last = null;
1✔
751
      return tmp;
1✔
752
    }
753

754
    @Override
755
    public void remove() {
756
      throw new UnsupportedOperationException();
×
757
    }
758

759
    private final class QueuingListener extends StartableListener<T> {
760
      // Non private to avoid synthetic class
761
      QueuingListener() {}
1✔
762

763
      private boolean done = false;
1✔
764

765
      @Override
766
      public void onHeaders(Metadata headers) {
767
      }
1✔
768

769
      @Override
770
      public void onMessage(T value) {
771
        Preconditions.checkState(!done, "ClientCall already closed");
1✔
772
        buffer.add(value);
1✔
773
      }
1✔
774

775
      @Override
776
      public void onClose(Status status, Metadata trailers) {
777
        Preconditions.checkState(!done, "ClientCall already closed");
1✔
778
        if (status.isOk()) {
1✔
779
          buffer.add(BlockingResponseStream.this);
1✔
780
        } else {
781
          buffer.add(status.asRuntimeException(trailers));
1✔
782
        }
783
        done = true;
1✔
784
      }
1✔
785

786
      @Override
787
      void onStart() {
788
        call.request(1);
1✔
789
      }
1✔
790
    }
791
  }
792

793
  @SuppressWarnings("serial")
794
  private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
795
      implements Executor {
796
    private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());
1✔
797

798
    private static final Object SHUTDOWN = new Object(); // sentinel
1✔
799

800
    // Set to the calling thread while it's parked, SHUTDOWN on RPC completion
801
    private volatile Object waiter;
802

803
    // Non private to avoid synthetic class
804
    ThreadlessExecutor() {}
1✔
805

806
    /**
807
     * Waits until there is a Runnable, then executes it and all queued Runnables after it.
808
     * Must only be called by one thread at a time.
809
     */
810
    public void waitAndDrain() throws InterruptedException {
811
      throwIfInterrupted();
1✔
812
      Runnable runnable = poll();
1✔
813
      if (runnable == null) {
1✔
814
        waiter = Thread.currentThread();
1✔
815
        try {
816
          while ((runnable = poll()) == null) {
1✔
817
            LockSupport.park(this);
1✔
818
            throwIfInterrupted();
1✔
819
          }
820
        } finally {
821
          waiter = null;
1✔
822
        }
823
      }
824
      do {
825
        runQuietly(runnable);
1✔
826
      } while ((runnable = poll()) != null);
1✔
827
    }
1✔
828

829
    private static void throwIfInterrupted() throws InterruptedException {
830
      if (Thread.interrupted()) {
1✔
831
        throw new InterruptedException();
1✔
832
      }
833
    }
1✔
834

835
    /**
836
     * Called after final call to {@link #waitAndDrain()}, from same thread.
837
     */
838
    public void shutdown() {
839
      waiter = SHUTDOWN;
1✔
840
      Runnable runnable;
841
      while ((runnable = poll()) != null) {
1✔
842
        runQuietly(runnable);
×
843
      }
844
    }
1✔
845

846
    private static void runQuietly(Runnable runnable) {
847
      try {
848
        runnable.run();
1✔
849
      } catch (Throwable t) {
×
850
        log.log(Level.WARNING, "Runnable threw exception", t);
×
851
      }
1✔
852
    }
1✔
853

854
    @Override
855
    public void execute(Runnable runnable) {
856
      add(runnable);
1✔
857
      Object waiter = this.waiter;
1✔
858
      if (waiter != SHUTDOWN) {
1✔
859
        LockSupport.unpark((Thread) waiter); // no-op if null
1✔
860
      } else if (remove(runnable) && rejectRunnableOnExecutor) {
1✔
861
        throw new RejectedExecutionException();
1✔
862
      }
863
    }
1✔
864
  }
865

866
  @SuppressWarnings("serial")
867
  static final class ThreadSafeThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
868
      implements Executor {
869
    private static final Logger log =
1✔
870
        Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName());
1✔
871

872
    private final Lock waiterLock = new ReentrantLock();
1✔
873
    private final Condition waiterCondition = waiterLock.newCondition();
1✔
874

875
    // Non private to avoid synthetic class
876
    ThreadSafeThreadlessExecutor() {}
1✔
877

878
    /**
879
     * Waits until there is a Runnable, then executes it and all queued Runnables after it.
880
     */
881
    public <T> void waitAndDrain(Predicate<T> predicate, T testTarget) throws InterruptedException {
882
      try {
883
        waitAndDrainWithTimeout(true, 0, predicate, testTarget);
×
884
      } catch (TimeoutException e) {
×
885
        throw new AssertionError(e); // Should never happen
×
886
      }
×
887
    }
×
888

889
    /**
890
     * Waits for up to specified nanoseconds until there is a Runnable, then executes it and all
891
     * queued Runnables after it.
892
     *
893
     * <p>his should always be called in a loop that checks whether the reason we are waiting has
894
     * been satisfied.</p>T
895
     *
896
     * @param waitForever ignore the rest of the arguments and wait until there is a task to run
897
     * @param end System.nanoTime() to stop waiting if haven't been woken up yet
898
     * @param predicate non-null condition to test for skipping wake or waking up threads
899
     * @param testTarget object to pass to predicate
900
     */
901
    public <T> void waitAndDrainWithTimeout(boolean waitForever, long end,
902
                                            @Nonnull Predicate<T> predicate, T testTarget)
903
        throws InterruptedException, TimeoutException {
904
      throwIfInterrupted();
1✔
905
      Runnable runnable;
906

907
      while (!predicate.apply(testTarget)) {
1✔
908
        waiterLock.lock();
1✔
909
        try {
910
          while ((runnable = poll()) == null) {
1✔
911
            if (predicate.apply(testTarget)) {
1✔
912
              return; // The condition for which we were waiting is now satisfied
×
913
            }
914

915
            if (waitForever) {
1✔
916
              waiterCondition.await();
1✔
917
            } else {
918
              long waitNanos = end - System.nanoTime();
1✔
919
              if (waitNanos <= 0) {
1✔
920
                throw new TimeoutException(); // Deadline is expired
1✔
921
              }
922
              waiterCondition.awaitNanos(waitNanos);
1✔
923
            }
1✔
924
          }
925
        } finally {
926
          waiterLock.unlock();
1✔
927
        }
928

929
        do {
930
          runQuietly(runnable);
1✔
931
        } while ((runnable = poll()) != null);
1✔
932
        // Wake everything up now that we've done something and they can check in their outer loop
933
        // if they can continue or need to wait again.
934
        signalAll();
1✔
935
      }
936
    }
1✔
937

938
    /** Executes all queued Runnables and if there were any wakes up any waiting threads. */
939
    void drain() {
940
      Runnable runnable;
941
      boolean didWork = false;
1✔
942

943
      while ((runnable = poll()) != null) {
1✔
944
        runQuietly(runnable);
1✔
945
        didWork = true;
1✔
946
      }
947

948
      if (didWork) {
1✔
949
        signalAll();
1✔
950
      }
951
    }
1✔
952

953
    private void signalAll() {
954
      waiterLock.lock();
1✔
955
      try {
956
        waiterCondition.signalAll();
1✔
957
      } finally {
958
        waiterLock.unlock();
1✔
959
      }
960
    }
1✔
961

962
    private static void runQuietly(Runnable runnable) {
963
      try {
964
        runnable.run();
1✔
965
      } catch (Throwable t) {
×
966
        log.log(Level.WARNING, "Runnable threw exception", t);
×
967
      }
1✔
968
    }
1✔
969

970
    private static void throwIfInterrupted() throws InterruptedException {
971
      if (Thread.interrupted()) {
1✔
972
        throw new InterruptedException();
×
973
      }
974
    }
1✔
975

976
    @Override
977
    public void execute(Runnable runnable) {
978
      waiterLock.lock();
1✔
979
      try {
980
        add(runnable);
1✔
981
        waiterCondition.signalAll(); // If anything is waiting let it wake up and process this task
1✔
982
      } finally {
983
        waiterLock.unlock();
1✔
984
      }
985
    }
1✔
986
  }
987

988
  enum StubType {
1✔
989
    BLOCKING, FUTURE, ASYNC
1✔
990
  }
991

992
  /**
993
   * Internal {@link CallOptions.Key} to indicate stub types.
994
   */
995
  static final CallOptions.Key<StubType> STUB_TYPE_OPTION =
1✔
996
      CallOptions.Key.create("internal-stub-type");
1✔
997
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc