• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19602

21 Dec 2024 12:16AM UTC coverage: 88.552% (-0.04%) from 88.593%
#19602

push

github

web-flow
Bidi Blocking Stub (#10318)

33618 of 37964 relevant lines covered (88.55%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.13
/../stub/src/main/java/io/grpc/stub/ClientCalls.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.stub;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Predicate;
26
import com.google.common.base.Strings;
27
import com.google.common.util.concurrent.AbstractFuture;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import io.grpc.CallOptions;
30
import io.grpc.Channel;
31
import io.grpc.ClientCall;
32
import io.grpc.ExperimentalApi;
33
import io.grpc.Metadata;
34
import io.grpc.MethodDescriptor;
35
import io.grpc.Status;
36
import io.grpc.StatusException;
37
import io.grpc.StatusRuntimeException;
38
import java.util.Iterator;
39
import java.util.NoSuchElementException;
40
import java.util.concurrent.ArrayBlockingQueue;
41
import java.util.concurrent.BlockingQueue;
42
import java.util.concurrent.ConcurrentLinkedQueue;
43
import java.util.concurrent.ExecutionException;
44
import java.util.concurrent.Executor;
45
import java.util.concurrent.Future;
46
import java.util.concurrent.RejectedExecutionException;
47
import java.util.concurrent.TimeoutException;
48
import java.util.concurrent.locks.Condition;
49
import java.util.concurrent.locks.Lock;
50
import java.util.concurrent.locks.LockSupport;
51
import java.util.concurrent.locks.ReentrantLock;
52
import java.util.logging.Level;
53
import java.util.logging.Logger;
54
import javax.annotation.Nonnull;
55
import javax.annotation.Nullable;
56

57
/**
58
 * Utility functions for processing different call idioms. We have one-to-one correspondence
59
 * between utilities in this class and the potential signatures in a generated stub class so
60
 * that the runtime can vary behavior without requiring regeneration of the stub.
61
 */
62
public final class ClientCalls {
63

64
  private static final Logger logger = Logger.getLogger(ClientCalls.class.getName());
1✔
65

66
  @VisibleForTesting
67
  static boolean rejectRunnableOnExecutor =
1✔
68
      !Strings.isNullOrEmpty(System.getenv("GRPC_CLIENT_CALL_REJECT_RUNNABLE"))
1✔
69
          && Boolean.parseBoolean(System.getenv("GRPC_CLIENT_CALL_REJECT_RUNNABLE"));
1✔
70

71
  // Prevent instantiation
72
  private ClientCalls() {}
73

74
  /**
75
   * Executes a unary call with a response {@link StreamObserver}.  The {@code call} should not be
76
   * already started.  After calling this method, {@code call} should no longer be used.
77
   *
78
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
79
   * {@code beforeStart()} will be called.
80
   */
81
  public static <ReqT, RespT> void asyncUnaryCall(
82
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
83
    checkNotNull(responseObserver, "responseObserver");
1✔
84
    asyncUnaryRequestCall(call, req, responseObserver, false);
1✔
85
  }
1✔
86

87
  /**
88
   * Executes a server-streaming call with a response {@link StreamObserver}.  The {@code call}
89
   * should not be already started.  After calling this method, {@code call} should no longer be
90
   * used.
91
   *
92
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
93
   * {@code beforeStart()} will be called.
94
   */
95
  public static <ReqT, RespT> void asyncServerStreamingCall(
96
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
97
    checkNotNull(responseObserver, "responseObserver");
1✔
98
    asyncUnaryRequestCall(call, req, responseObserver, true);
1✔
99
  }
1✔
100

101
  /**
102
   * Executes a client-streaming call returning a {@link StreamObserver} for the request messages.
103
   * The {@code call} should not be already started.  After calling this method, {@code call}
104
   * should no longer be used.
105
   *
106
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
107
   * {@code beforeStart()} will be called.
108
   *
109
   * @return request stream observer. It will extend {@link ClientCallStreamObserver}
110
   */
111
  public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall(
112
      ClientCall<ReqT, RespT> call,
113
      StreamObserver<RespT> responseObserver) {
114
    checkNotNull(responseObserver, "responseObserver");
1✔
115
    return asyncStreamingRequestCall(call, responseObserver, false);
1✔
116
  }
117

118
  /**
119
   * Executes a bidirectional-streaming call.  The {@code call} should not be already started.
120
   * After calling this method, {@code call} should no longer be used.
121
   *
122
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
123
   * {@code beforeStart()} will be called.
124
   *
125
   * @return request stream observer. It will extend {@link ClientCallStreamObserver}
126
   */
127
  public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall(
128
      ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
129
    checkNotNull(responseObserver, "responseObserver");
1✔
130
    return asyncStreamingRequestCall(call, responseObserver, true);
1✔
131
  }
132

133
  /**
134
   * Executes a unary call and blocks on the response.  The {@code call} should not be already
135
   * started.  After calling this method, {@code call} should no longer be used.
136
   *
137
   * @return the single response message.
138
   * @throws StatusRuntimeException on error
139
   */
140
  public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
141
    try {
142
      return getUnchecked(futureUnaryCall(call, req));
1✔
143
    } catch (RuntimeException | Error e) {
1✔
144
      throw cancelThrow(call, e);
×
145
    }
146
  }
147

148
  /**
149
   * Executes a unary call and blocks on the response.  The {@code call} should not be already
150
   * started.  After calling this method, {@code call} should no longer be used.
151
   *
152
   * @return the single response message.
153
   * @throws StatusRuntimeException on error
154
   */
155
  public static <ReqT, RespT> RespT blockingUnaryCall(
156
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
157
    ThreadlessExecutor executor = new ThreadlessExecutor();
1✔
158
    boolean interrupt = false;
1✔
159
    ClientCall<ReqT, RespT> call = channel.newCall(method,
1✔
160
        callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
1✔
161
            .withExecutor(executor));
1✔
162
    try {
163
      ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
1✔
164
      while (!responseFuture.isDone()) {
1✔
165
        try {
166
          executor.waitAndDrain();
1✔
167
        } catch (InterruptedException e) {
1✔
168
          interrupt = true;
1✔
169
          call.cancel("Thread interrupted", e);
1✔
170
          // Now wait for onClose() to be called, so interceptors can clean up
171
        }
1✔
172
      }
173
      executor.shutdown();
1✔
174
      return getUnchecked(responseFuture);
1✔
175
    } catch (RuntimeException | Error e) {
1✔
176
      // Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
177
      throw cancelThrow(call, e);
×
178
    } finally {
179
      if (interrupt) {
1✔
180
        Thread.currentThread().interrupt();
1✔
181
      }
182
    }
183
  }
184

185
  /**
186
   * Executes a server-streaming call returning a blocking {@link Iterator} over the
187
   * response stream.  The {@code call} should not be already started.  After calling this method,
188
   * {@code call} should no longer be used.
189
   *
190
   * <p>The returned iterator may throw {@link StatusRuntimeException} on error.
191
   *
192
   * @return an iterator over the response stream.
193
   */
194
  public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
195
      ClientCall<ReqT, RespT> call, ReqT req) {
196
    BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
1✔
197
    asyncUnaryRequestCall(call, req, result.listener());
1✔
198
    return result;
1✔
199
  }
200

201
  /**
202
   * Executes a server-streaming call returning a blocking {@link Iterator} over the
203
   * response stream.
204
   *
205
   * <p>The returned iterator may throw {@link StatusRuntimeException} on error.
206
   *
207
   * <p>Warning:  the iterator can result in leaks if not completely consumed.
208
   *
209
   * @return an iterator over the response stream.
210
   */
211
  public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
212
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
213
    ClientCall<ReqT, RespT> call = channel.newCall(method,
1✔
214
        callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING));
1✔
215

216
    BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
1✔
217
    asyncUnaryRequestCall(call, req, result.listener());
1✔
218
    return result;
1✔
219
  }
220

221
  /**
222
   * Initiates a client streaming call over the specified channel.  It returns an
223
   * object which can be used in a blocking manner to retrieve responses..
224
   *
225
   * <p>The methods {@link BlockingClientCall#hasNext()} and {@link
226
   * BlockingClientCall#cancel(String, Throwable)} can be used for more extensive control.
227
   *
228
   * @return A {@link BlockingClientCall} that has had the request sent and halfClose called
229
   */
230
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
231
  public static <ReqT, RespT> BlockingClientCall<ReqT, RespT> blockingV2ServerStreamingCall(
232
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
233
    BlockingClientCall<ReqT, RespT> call =
×
234
        blockingBidiStreamingCall(channel, method, callOptions);
×
235

236
    call.sendSingleRequest(req);
×
237
    call.halfClose();
×
238
    return call;
×
239
  }
240

241
  /**
242
   * Initiates a server streaming call and sends the specified request to the server.  It returns an
243
   * object which can be used in a blocking manner to retrieve values from the server.  After the
244
   * last value has been read, the next read call will return null.
245
   *
246
   * <p>Call {@link BlockingClientCall#read()} for
247
   * retrieving values.  A {@code null} will be returned after the server has closed the stream.
248
   *
249
   * <p>The methods {@link BlockingClientCall#hasNext()} and {@link
250
   * BlockingClientCall#cancel(String, Throwable)} can be used for more extensive control.
251
   *
252
   * <p><br> Example usage:
253
   * <pre> {@code  while ((response = call.read()) != null) { ... } } </pre>
254
   * or
255
   * <pre> {@code
256
   *   while (call.hasNext()) {
257
   *     response = call.read();
258
   *     ...
259
   *   }
260
   * } </pre>
261
   *
262
   * <p>Note that this paradigm is different from the original
263
   * {@link #blockingServerStreamingCall(Channel, MethodDescriptor, CallOptions, Object)}
264
   * which returns an iterator, which would leave the stream open if not completely consumed.
265
   *
266
   * @return A {@link BlockingClientCall} which can be used by the client to write and receive
267
   *     messages over the grpc channel.
268
   */
269
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
270
  public static <ReqT, RespT> BlockingClientCall<ReqT, RespT> blockingClientStreamingCall(
271
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
272
    return blockingBidiStreamingCall(channel, method, callOptions);
×
273
  }
274

275
  /**
276
   * Initiate a bidirectional-streaming {@link ClientCall} and returning a stream object
277
   * ({@link BlockingClientCall}) which can be used by the client to send and receive messages over
278
   * the grpc channel.
279
   *
280
   * @return an object representing the call which can be used to read, write and terminate it.
281
   */
282
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
283
  public static <ReqT, RespT> BlockingClientCall<ReqT, RespT> blockingBidiStreamingCall(
284
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
285
    ThreadSafeThreadlessExecutor executor = new ThreadSafeThreadlessExecutor();
1✔
286
    ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
1✔
287

288
    BlockingClientCall<ReqT, RespT> blockingClientCall = new BlockingClientCall<>(call, executor);
1✔
289

290
    // Get the call started
291
    call.start(blockingClientCall.getListener(), new Metadata());
1✔
292
    call.request(1);
1✔
293

294
    return blockingClientCall;
1✔
295
  }
296

297
  /**
298
   * Executes a unary call and returns a {@link ListenableFuture} to the response.  The
299
   * {@code call} should not be already started.  After calling this method, {@code call} should no
300
   * longer be used.
301
   *
302
   * @return a future for the single response message.
303
   */
304
  public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
305
      ClientCall<ReqT, RespT> call, ReqT req) {
306
    GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call);
1✔
307
    asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture));
1✔
308
    return responseFuture;
1✔
309
  }
310

311
  /**
312
   * Returns the result of calling {@link Future#get()} interruptibly on a task known not to throw a
313
   * checked exception.
314
   *
315
   * <p>If interrupted, the interrupt is restored before throwing an exception..
316
   *
317
   * @throws java.util.concurrent.CancellationException
318
   *     if {@code get} throws a {@code CancellationException}.
319
   * @throws io.grpc.StatusRuntimeException if {@code get} throws an {@link ExecutionException}
320
   *     or an {@link InterruptedException}.
321
   */
322
  private static <V> V getUnchecked(Future<V> future) {
323
    try {
324
      return future.get();
1✔
325
    } catch (InterruptedException e) {
×
326
      Thread.currentThread().interrupt();
×
327
      throw Status.CANCELLED
×
328
          .withDescription("Thread interrupted")
×
329
          .withCause(e)
×
330
          .asRuntimeException();
×
331
    } catch (ExecutionException e) {
1✔
332
      throw toStatusRuntimeException(e.getCause());
1✔
333
    }
334
  }
335

336
  /**
337
   * Wraps the given {@link Throwable} in a {@link StatusRuntimeException}. If it contains an
338
   * embedded {@link StatusException} or {@link StatusRuntimeException}, the returned exception will
339
   * contain the embedded trailers and status, with the given exception as the cause. Otherwise, an
340
   * exception will be generated from an {@link Status#UNKNOWN} status.
341
   */
342
  private static StatusRuntimeException toStatusRuntimeException(Throwable t) {
343
    Throwable cause = checkNotNull(t, "t");
1✔
344
    while (cause != null) {
1✔
345
      // If we have an embedded status, use it and replace the cause
346
      if (cause instanceof StatusException) {
1✔
347
        StatusException se = (StatusException) cause;
×
348
        return new StatusRuntimeException(se.getStatus(), se.getTrailers());
×
349
      } else if (cause instanceof StatusRuntimeException) {
1✔
350
        StatusRuntimeException se = (StatusRuntimeException) cause;
1✔
351
        return new StatusRuntimeException(se.getStatus(), se.getTrailers());
1✔
352
      }
353
      cause = cause.getCause();
×
354
    }
355
    return Status.UNKNOWN.withDescription("unexpected exception").withCause(t)
×
356
        .asRuntimeException();
×
357
  }
358

359
  /**
360
   * Cancels a call, and throws the exception.
361
   *
362
   * @param t must be a RuntimeException or Error
363
   */
364
  private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
365
    try {
366
      call.cancel(null, t);
1✔
367
    } catch (RuntimeException | Error e) {
×
368
      logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
×
369
    }
1✔
370
    if (t instanceof RuntimeException) {
1✔
371
      throw (RuntimeException) t;
1✔
372
    } else if (t instanceof Error) {
×
373
      throw (Error) t;
×
374
    }
375
    // should be impossible
376
    throw new AssertionError(t);
×
377
  }
378

379
  private static <ReqT, RespT> void asyncUnaryRequestCall(
380
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver,
381
      boolean streamingResponse) {
382
    asyncUnaryRequestCall(
1✔
383
        call,
384
        req,
385
        new StreamObserverToCallListenerAdapter<>(
386
            responseObserver,
387
            new CallToStreamObserverAdapter<>(call, streamingResponse)));
388
  }
1✔
389

390
  private static <ReqT, RespT> void asyncUnaryRequestCall(
391
      ClientCall<ReqT, RespT> call,
392
      ReqT req,
393
      StartableListener<RespT> responseListener) {
394
    startCall(call, responseListener);
1✔
395
    try {
396
      call.sendMessage(req);
1✔
397
      call.halfClose();
1✔
398
    } catch (RuntimeException | Error e) {
×
399
      throw cancelThrow(call, e);
×
400
    }
1✔
401
  }
1✔
402

403
  private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
404
      ClientCall<ReqT, RespT> call,
405
      StreamObserver<RespT> responseObserver,
406
      boolean streamingResponse) {
407
    CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<>(
1✔
408
        call, streamingResponse);
409
    startCall(
1✔
410
        call,
411
        new StreamObserverToCallListenerAdapter<>(responseObserver, adapter));
412
    return adapter;
1✔
413
  }
414

415
  private static <ReqT, RespT> void startCall(
416
      ClientCall<ReqT, RespT> call,
417
      StartableListener<RespT> responseListener) {
418
    call.start(responseListener, new Metadata());
1✔
419
    responseListener.onStart();
1✔
420
  }
1✔
421

422
  private abstract static class StartableListener<T> extends ClientCall.Listener<T> {
423
    abstract void onStart();
424
  }
425

426
  private static final class CallToStreamObserverAdapter<ReqT>
427
      extends ClientCallStreamObserver<ReqT> {
428
    private boolean frozen;
429
    private final ClientCall<ReqT, ?> call;
430
    private final boolean streamingResponse;
431
    private Runnable onReadyHandler;
432
    private int initialRequest = 1;
1✔
433
    private boolean autoRequestEnabled = true;
1✔
434
    private boolean aborted = false;
1✔
435
    private boolean completed = false;
1✔
436

437
    // Non private to avoid synthetic class
438
    CallToStreamObserverAdapter(ClientCall<ReqT, ?> call, boolean streamingResponse) {
1✔
439
      this.call = call;
1✔
440
      this.streamingResponse = streamingResponse;
1✔
441
    }
1✔
442

443
    private void freeze() {
444
      this.frozen = true;
1✔
445
    }
1✔
446

447
    @Override
448
    public void onNext(ReqT value) {
449
      checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
1✔
450
      checkState(!completed, "Stream is already completed, no further calls are allowed");
1✔
451
      call.sendMessage(value);
1✔
452
    }
1✔
453

454
    @Override
455
    public void onError(Throwable t) {
456
      call.cancel("Cancelled by client with StreamObserver.onError()", t);
1✔
457
      aborted = true;
1✔
458
    }
1✔
459

460
    @Override
461
    public void onCompleted() {
462
      call.halfClose();
1✔
463
      completed = true;
1✔
464
    }
1✔
465

466
    @Override
467
    public boolean isReady() {
468
      return call.isReady();
1✔
469
    }
470

471
    @Override
472
    public void setOnReadyHandler(Runnable onReadyHandler) {
473
      if (frozen) {
1✔
474
        throw new IllegalStateException(
1✔
475
            "Cannot alter onReadyHandler after call started. Use ClientResponseObserver");
476
      }
477
      this.onReadyHandler = onReadyHandler;
1✔
478
    }
1✔
479

480
    @Override
481
    public void disableAutoInboundFlowControl() {
482
      disableAutoRequestWithInitial(1);
×
483
    }
×
484

485
    @Override
486
    public void disableAutoRequestWithInitial(int request) {
487
      if (frozen) {
1✔
488
        throw new IllegalStateException(
×
489
            "Cannot disable auto flow control after call started. Use ClientResponseObserver");
490
      }
491
      Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
1✔
492
      initialRequest = request;
1✔
493
      autoRequestEnabled = false;
1✔
494
    }
1✔
495

496
    @Override
497
    public void request(int count) {
498
      if (!streamingResponse && count == 1) {
1✔
499
        // Initially ask for two responses from flow-control so that if a misbehaving server
500
        // sends more than one response, we can catch it and fail it in the listener.
501
        call.request(2);
1✔
502
      } else {
503
        call.request(count);
1✔
504
      }
505
    }
1✔
506

507
    @Override
508
    public void setMessageCompression(boolean enable) {
509
      call.setMessageCompression(enable);
×
510
    }
×
511

512
    @Override
513
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
514
      call.cancel(message, cause);
×
515
    }
×
516
  }
517

518
  private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
519
      extends StartableListener<RespT> {
520
    private final StreamObserver<RespT> observer;
521
    private final CallToStreamObserverAdapter<ReqT> adapter;
522
    private boolean firstResponseReceived;
523

524
    // Non private to avoid synthetic class
525
    StreamObserverToCallListenerAdapter(
526
        StreamObserver<RespT> observer,
527
        CallToStreamObserverAdapter<ReqT> adapter) {
1✔
528
      this.observer = observer;
1✔
529
      this.adapter = adapter;
1✔
530
      if (observer instanceof ClientResponseObserver) {
1✔
531
        @SuppressWarnings("unchecked")
532
        ClientResponseObserver<ReqT, RespT> clientResponseObserver =
1✔
533
            (ClientResponseObserver<ReqT, RespT>) observer;
534
        clientResponseObserver.beforeStart(adapter);
1✔
535
      }
536
      adapter.freeze();
1✔
537
    }
1✔
538

539
    @Override
540
    public void onHeaders(Metadata headers) {
541
    }
1✔
542

543
    @Override
544
    public void onMessage(RespT message) {
545
      if (firstResponseReceived && !adapter.streamingResponse) {
1✔
546
        throw Status.INTERNAL
×
547
            .withDescription("More than one responses received for unary or client-streaming call")
×
548
            .asRuntimeException();
×
549
      }
550
      firstResponseReceived = true;
1✔
551
      observer.onNext(message);
1✔
552

553
      if (adapter.streamingResponse && adapter.autoRequestEnabled) {
1✔
554
        // Request delivery of the next inbound message.
555
        adapter.request(1);
1✔
556
      }
557
    }
1✔
558

559
    @Override
560
    public void onClose(Status status, Metadata trailers) {
561
      if (status.isOk()) {
1✔
562
        observer.onCompleted();
1✔
563
      } else {
564
        observer.onError(status.asRuntimeException(trailers));
1✔
565
      }
566
    }
1✔
567

568
    @Override
569
    public void onReady() {
570
      if (adapter.onReadyHandler != null) {
1✔
571
        adapter.onReadyHandler.run();
1✔
572
      }
573
    }
1✔
574

575
    @Override
576
    void onStart() {
577
      if (adapter.initialRequest > 0) {
1✔
578
        adapter.request(adapter.initialRequest);
1✔
579
      }
580
    }
1✔
581
  }
582

583
  /**
584
   * Completes a {@link GrpcFuture} using {@link StreamObserver} events.
585
   */
586
  private static final class UnaryStreamToFuture<RespT> extends StartableListener<RespT> {
587
    private final GrpcFuture<RespT> responseFuture;
588
    private RespT value;
589
    private boolean isValueReceived = false;
1✔
590

591
    // Non private to avoid synthetic class
592
    UnaryStreamToFuture(GrpcFuture<RespT> responseFuture) {
1✔
593
      this.responseFuture = responseFuture;
1✔
594
    }
1✔
595

596
    @Override
597
    public void onHeaders(Metadata headers) {
598
    }
1✔
599

600
    @Override
601
    public void onMessage(RespT value) {
602
      if (this.isValueReceived) {
1✔
603
        throw Status.INTERNAL.withDescription("More than one value received for unary call")
×
604
            .asRuntimeException();
×
605
      }
606
      this.value = value;
1✔
607
      this.isValueReceived = true;
1✔
608
    }
1✔
609

610
    @Override
611
    public void onClose(Status status, Metadata trailers) {
612
      if (status.isOk()) {
1✔
613
        if (!isValueReceived) {
1✔
614
          // No value received so mark the future as an error
615
          responseFuture.setException(
×
616
              Status.INTERNAL.withDescription("No value received for unary call")
×
617
                  .asRuntimeException(trailers));
×
618
        }
619
        responseFuture.set(value);
1✔
620
      } else {
621
        responseFuture.setException(status.asRuntimeException(trailers));
1✔
622
      }
623
    }
1✔
624

625
    @Override
626
    void onStart() {
627
      responseFuture.call.request(2);
1✔
628
    }
1✔
629
  }
630

631
  private static final class GrpcFuture<RespT> extends AbstractFuture<RespT> {
632
    private final ClientCall<?, RespT> call;
633

634
    // Non private to avoid synthetic class
635
    GrpcFuture(ClientCall<?, RespT> call) {
1✔
636
      this.call = call;
1✔
637
    }
1✔
638

639
    @Override
640
    protected void interruptTask() {
641
      call.cancel("GrpcFuture was cancelled", null);
1✔
642
    }
1✔
643

644
    @Override
645
    protected boolean set(@Nullable RespT resp) {
646
      return super.set(resp);
1✔
647
    }
648

649
    @Override
650
    protected boolean setException(Throwable throwable) {
651
      return super.setException(throwable);
1✔
652
    }
653

654
    @SuppressWarnings("MissingOverride") // Add @Override once Java 6 support is dropped
655
    protected String pendingToString() {
656
      return MoreObjects.toStringHelper(this).add("clientCall", call).toString();
×
657
    }
658
  }
659

660
  /**
661
   * Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking {@link Iterator}.
662
   *
663
   * <p>The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a
664
   * separate thread from {@link Iterator} calls.
665
   */
666
  // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
667
  private static final class BlockingResponseStream<T> implements Iterator<T> {
668
    // Due to flow control, only needs to hold up to 3 items: 2 for value, 1 for close.
669
    // (2 for value, not 1, because of early request() in next())
670
    private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
1✔
671
    private final StartableListener<T> listener = new QueuingListener();
1✔
672
    private final ClientCall<?, T> call;
673
    // Only accessed when iterating.
674
    private Object last;
675

676
    // Non private to avoid synthetic class
677
    BlockingResponseStream(ClientCall<?, T> call) {
1✔
678
      this.call = call;
1✔
679
    }
1✔
680

681
    StartableListener<T> listener() {
682
      return listener;
1✔
683
    }
684

685
    private Object waitForNext() {
686
      boolean interrupt = false;
1✔
687
      try {
688
        while (true) {
689
          try {
690
            return buffer.take();
1✔
691
          } catch (InterruptedException ie) {
1✔
692
            interrupt = true;
1✔
693
            call.cancel("Thread interrupted", ie);
1✔
694
            // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
695
          }
1✔
696
        }
697
      } finally {
698
        if (interrupt) {
1✔
699
          Thread.currentThread().interrupt();
1✔
700
        }
701
      }
702
    }
703

704
    @Override
705
    public boolean hasNext() {
706
      while (last == null) {
1✔
707
        // Will block here indefinitely waiting for content. RPC timeouts defend against permanent
708
        // hangs here as the call will become closed.
709
        last = waitForNext();
1✔
710
      }
711
      if (last instanceof StatusRuntimeException) {
1✔
712
        // Rethrow the exception with a new stacktrace.
713
        StatusRuntimeException e = (StatusRuntimeException) last;
1✔
714
        throw e.getStatus().asRuntimeException(e.getTrailers());
1✔
715
      }
716
      return last != this;
1✔
717
    }
718

719
    @Override
720
    public T next() {
721
      // Eagerly call request(1) so it can be processing the next message while we wait for the
722
      // current one, which reduces latency for the next message. With MigratingThreadDeframer and
723
      // if the data has already been received, every other message can be delivered instantly. This
724
      // can be run after hasNext(), but just would be slower.
725
      if (!(last instanceof StatusRuntimeException) && last != this) {
1✔
726
        call.request(1);
1✔
727
      }
728
      if (!hasNext()) {
1✔
729
        throw new NoSuchElementException();
×
730
      }
731
      @SuppressWarnings("unchecked")
732
      T tmp = (T) last;
1✔
733
      last = null;
1✔
734
      return tmp;
1✔
735
    }
736

737
    @Override
738
    public void remove() {
739
      throw new UnsupportedOperationException();
×
740
    }
741

742
    private final class QueuingListener extends StartableListener<T> {
743
      // Non private to avoid synthetic class
744
      QueuingListener() {}
1✔
745

746
      private boolean done = false;
1✔
747

748
      @Override
749
      public void onHeaders(Metadata headers) {
750
      }
1✔
751

752
      @Override
753
      public void onMessage(T value) {
754
        Preconditions.checkState(!done, "ClientCall already closed");
1✔
755
        buffer.add(value);
1✔
756
      }
1✔
757

758
      @Override
759
      public void onClose(Status status, Metadata trailers) {
760
        Preconditions.checkState(!done, "ClientCall already closed");
1✔
761
        if (status.isOk()) {
1✔
762
          buffer.add(BlockingResponseStream.this);
1✔
763
        } else {
764
          buffer.add(status.asRuntimeException(trailers));
1✔
765
        }
766
        done = true;
1✔
767
      }
1✔
768

769
      @Override
770
      void onStart() {
771
        call.request(1);
1✔
772
      }
1✔
773
    }
774
  }
775

776
  @SuppressWarnings("serial")
777
  private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
778
      implements Executor {
779
    private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());
1✔
780

781
    private static final Object SHUTDOWN = new Object(); // sentinel
1✔
782

783
    // Set to the calling thread while it's parked, SHUTDOWN on RPC completion
784
    private volatile Object waiter;
785

786
    // Non private to avoid synthetic class
787
    ThreadlessExecutor() {}
1✔
788

789
    /**
790
     * Waits until there is a Runnable, then executes it and all queued Runnables after it.
791
     * Must only be called by one thread at a time.
792
     */
793
    public void waitAndDrain() throws InterruptedException {
794
      throwIfInterrupted();
1✔
795
      Runnable runnable = poll();
1✔
796
      if (runnable == null) {
1✔
797
        waiter = Thread.currentThread();
1✔
798
        try {
799
          while ((runnable = poll()) == null) {
1✔
800
            LockSupport.park(this);
1✔
801
            throwIfInterrupted();
1✔
802
          }
803
        } finally {
804
          waiter = null;
1✔
805
        }
806
      }
807
      do {
808
        runQuietly(runnable);
1✔
809
      } while ((runnable = poll()) != null);
1✔
810
    }
1✔
811

812
    private static void throwIfInterrupted() throws InterruptedException {
813
      if (Thread.interrupted()) {
1✔
814
        throw new InterruptedException();
1✔
815
      }
816
    }
1✔
817

818
    /**
819
     * Called after final call to {@link #waitAndDrain()}, from same thread.
820
     */
821
    public void shutdown() {
822
      waiter = SHUTDOWN;
1✔
823
      Runnable runnable;
824
      while ((runnable = poll()) != null) {
1✔
825
        runQuietly(runnable);
×
826
      }
827
    }
1✔
828

829
    private static void runQuietly(Runnable runnable) {
830
      try {
831
        runnable.run();
1✔
832
      } catch (Throwable t) {
×
833
        log.log(Level.WARNING, "Runnable threw exception", t);
×
834
      }
1✔
835
    }
1✔
836

837
    @Override
838
    public void execute(Runnable runnable) {
839
      add(runnable);
1✔
840
      Object waiter = this.waiter;
1✔
841
      if (waiter != SHUTDOWN) {
1✔
842
        LockSupport.unpark((Thread) waiter); // no-op if null
1✔
843
      } else if (remove(runnable) && rejectRunnableOnExecutor) {
1✔
844
        throw new RejectedExecutionException();
1✔
845
      }
846
    }
1✔
847
  }
848

849
  @SuppressWarnings("serial")
850
  static final class ThreadSafeThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
851
      implements Executor {
852
    private static final Logger log =
1✔
853
        Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName());
1✔
854

855
    private final Lock waiterLock = new ReentrantLock();
1✔
856
    private final Condition waiterCondition = waiterLock.newCondition();
1✔
857

858
    // Non private to avoid synthetic class
859
    ThreadSafeThreadlessExecutor() {}
1✔
860

861
    /**
862
     * Waits until there is a Runnable, then executes it and all queued Runnables after it.
863
     */
864
    public <T> void waitAndDrain(Predicate<T> predicate, T testTarget) throws InterruptedException {
865
      try {
866
        waitAndDrainWithTimeout(true, 0, predicate, testTarget);
×
867
      } catch (TimeoutException e) {
×
868
        throw new AssertionError(e); // Should never happen
×
869
      }
×
870
    }
×
871

872
    /**
873
     * Waits for up to specified nanoseconds until there is a Runnable, then executes it and all
874
     * queued Runnables after it.
875
     *
876
     * <p>his should always be called in a loop that checks whether the reason we are waiting has
877
     * been satisfied.</p>T
878
     *
879
     * @param waitForever ignore the rest of the arguments and wait until there is a task to run
880
     * @param end System.nanoTime() to stop waiting if haven't been woken up yet
881
     * @param predicate non-null condition to test for skipping wake or waking up threads
882
     * @param testTarget object to pass to predicate
883
     */
884
    public <T> void waitAndDrainWithTimeout(boolean waitForever, long end,
885
                                            @Nonnull Predicate<T> predicate, T testTarget)
886
        throws InterruptedException, TimeoutException {
887
      throwIfInterrupted();
1✔
888
      Runnable runnable;
889

890
      while (!predicate.apply(testTarget)) {
1✔
891
        waiterLock.lock();
1✔
892
        try {
893
          while ((runnable = poll()) == null) {
1✔
894
            if (predicate.apply(testTarget)) {
1✔
895
              return; // The condition for which we were waiting is now satisfied
×
896
            }
897

898
            if (waitForever) {
1✔
899
              waiterCondition.await();
1✔
900
            } else {
901
              long waitNanos = end - System.nanoTime();
1✔
902
              if (waitNanos <= 0) {
1✔
903
                throw new TimeoutException(); // Deadline is expired
1✔
904
              }
905
              waiterCondition.awaitNanos(waitNanos);
1✔
906
            }
1✔
907
          }
908
        } finally {
909
          waiterLock.unlock();
1✔
910
        }
911

912
        do {
913
          runQuietly(runnable);
1✔
914
        } while ((runnable = poll()) != null);
1✔
915
        // Wake everything up now that we've done something and they can check in their outer loop
916
        // if they can continue or need to wait again.
917
        signallAll();
1✔
918
      }
919
    }
1✔
920

921
    /**
922
     * Executes all queued Runnables and if there were any wakes up any waiting threads.
923
     */
924
    public void drain() throws InterruptedException {
925
      throwIfInterrupted();
1✔
926
      Runnable runnable;
927
      boolean didWork = false;
1✔
928

929
      while ((runnable = poll()) != null) {
1✔
930
        runQuietly(runnable);
1✔
931
        didWork = true;
1✔
932
      }
933

934
      if (didWork) {
1✔
935
        signallAll();
1✔
936
      }
937
    }
1✔
938

939
    private void signallAll() {
940
      waiterLock.lock();
1✔
941
      try {
942
        waiterCondition.signalAll();
1✔
943
      } finally {
944
        waiterLock.unlock();
1✔
945
      }
946
    }
1✔
947

948
    private static void runQuietly(Runnable runnable) {
949
      try {
950
        runnable.run();
1✔
951
      } catch (Throwable t) {
×
952
        log.log(Level.WARNING, "Runnable threw exception", t);
×
953
      }
1✔
954
    }
1✔
955

956
    private static void throwIfInterrupted() throws InterruptedException {
957
      if (Thread.interrupted()) {
1✔
958
        throw new InterruptedException();
×
959
      }
960
    }
1✔
961

962
    @Override
963
    public void execute(Runnable runnable) {
964
      waiterLock.lock();
1✔
965
      try {
966
        add(runnable);
1✔
967
        waiterCondition.signalAll(); // If anything is waiting let it wake up and process this task
1✔
968
      } finally {
969
        waiterLock.unlock();
1✔
970
      }
971
    }
1✔
972
  }
973

974
  enum StubType {
1✔
975
    BLOCKING, FUTURE, ASYNC
1✔
976
  }
977

978
  /**
979
   * Internal {@link CallOptions.Key} to indicate stub types.
980
   */
981
  static final CallOptions.Key<StubType> STUB_TYPE_OPTION =
1✔
982
      CallOptions.Key.create("internal-stub-type");
1✔
983
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc