• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18726

pending completion
#18726

push

github-actions

web-flow
Stabilize setExtensionRegistry() of ProtoLiteUtils and ProtoUtils (#10392)

Closes #1787

29151 of 33044 relevant lines covered (88.22%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.39
/../stub/src/main/java/io/grpc/stub/ClientCalls.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.stub;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Strings;
26
import com.google.common.util.concurrent.AbstractFuture;
27
import com.google.common.util.concurrent.ListenableFuture;
28
import io.grpc.CallOptions;
29
import io.grpc.Channel;
30
import io.grpc.ClientCall;
31
import io.grpc.Metadata;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.StatusException;
35
import io.grpc.StatusRuntimeException;
36
import java.util.Iterator;
37
import java.util.NoSuchElementException;
38
import java.util.concurrent.ArrayBlockingQueue;
39
import java.util.concurrent.BlockingQueue;
40
import java.util.concurrent.ConcurrentLinkedQueue;
41
import java.util.concurrent.ExecutionException;
42
import java.util.concurrent.Executor;
43
import java.util.concurrent.Future;
44
import java.util.concurrent.RejectedExecutionException;
45
import java.util.concurrent.locks.LockSupport;
46
import java.util.logging.Level;
47
import java.util.logging.Logger;
48
import javax.annotation.Nullable;
49

50
/**
51
 * Utility functions for processing different call idioms. We have one-to-one correspondence
52
 * between utilities in this class and the potential signatures in a generated stub class so
53
 * that the runtime can vary behavior without requiring regeneration of the stub.
54
 */
55
public final class ClientCalls {
56

57
  private static final Logger logger = Logger.getLogger(ClientCalls.class.getName());
1✔
58

59
  @VisibleForTesting
60
  static boolean rejectRunnableOnExecutor =
1✔
61
      !Strings.isNullOrEmpty(System.getenv("GRPC_CLIENT_CALL_REJECT_RUNNABLE"))
1✔
62
          && Boolean.parseBoolean(System.getenv("GRPC_CLIENT_CALL_REJECT_RUNNABLE"));
1✔
63

64
  // Prevent instantiation
65
  private ClientCalls() {}
66

67
  /**
68
   * Executes a unary call with a response {@link StreamObserver}.  The {@code call} should not be
69
   * already started.  After calling this method, {@code call} should no longer be used.
70
   *
71
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
72
   * {@code beforeStart()} will be called.
73
   */
74
  public static <ReqT, RespT> void asyncUnaryCall(
75
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
76
    checkNotNull(responseObserver, "responseObserver");
1✔
77
    asyncUnaryRequestCall(call, req, responseObserver, false);
1✔
78
  }
1✔
79

80
  /**
81
   * Executes a server-streaming call with a response {@link StreamObserver}.  The {@code call}
82
   * should not be already started.  After calling this method, {@code call} should no longer be
83
   * used.
84
   *
85
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
86
   * {@code beforeStart()} will be called.
87
   */
88
  public static <ReqT, RespT> void asyncServerStreamingCall(
89
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
90
    checkNotNull(responseObserver, "responseObserver");
1✔
91
    asyncUnaryRequestCall(call, req, responseObserver, true);
1✔
92
  }
1✔
93

94
  /**
95
   * Executes a client-streaming call returning a {@link StreamObserver} for the request messages.
96
   * The {@code call} should not be already started.  After calling this method, {@code call}
97
   * should no longer be used.
98
   *
99
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
100
   * {@code beforeStart()} will be called.
101
   *
102
   * @return request stream observer. It will extend {@link ClientCallStreamObserver}
103
   */
104
  public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall(
105
      ClientCall<ReqT, RespT> call,
106
      StreamObserver<RespT> responseObserver) {
107
    checkNotNull(responseObserver, "responseObserver");
1✔
108
    return asyncStreamingRequestCall(call, responseObserver, false);
1✔
109
  }
110

111
  /**
112
   * Executes a bidirectional-streaming call.  The {@code call} should not be already started.
113
   * After calling this method, {@code call} should no longer be used.
114
   *
115
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
116
   * {@code beforeStart()} will be called.
117
   *
118
   * @return request stream observer. It will extend {@link ClientCallStreamObserver}
119
   */
120
  public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall(
121
      ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
122
    checkNotNull(responseObserver, "responseObserver");
1✔
123
    return asyncStreamingRequestCall(call, responseObserver, true);
1✔
124
  }
125

126
  /**
127
   * Executes a unary call and blocks on the response.  The {@code call} should not be already
128
   * started.  After calling this method, {@code call} should no longer be used.
129
   *
130
   * @return the single response message.
131
   * @throws StatusRuntimeException on error
132
   */
133
  public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
134
    try {
135
      return getUnchecked(futureUnaryCall(call, req));
1✔
136
    } catch (RuntimeException e) {
1✔
137
      throw cancelThrow(call, e);
×
138
    } catch (Error e) {
×
139
      throw cancelThrow(call, e);
×
140
    }
141
  }
142

143
  /**
144
   * Executes a unary call and blocks on the response.  The {@code call} should not be already
145
   * started.  After calling this method, {@code call} should no longer be used.
146
   *
147
   * @return the single response message.
148
   * @throws StatusRuntimeException on error
149
   */
150
  public static <ReqT, RespT> RespT blockingUnaryCall(
151
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
152
    ThreadlessExecutor executor = new ThreadlessExecutor();
1✔
153
    boolean interrupt = false;
1✔
154
    ClientCall<ReqT, RespT> call = channel.newCall(method,
1✔
155
        callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
1✔
156
            .withExecutor(executor));
1✔
157
    try {
158
      ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
1✔
159
      while (!responseFuture.isDone()) {
1✔
160
        try {
161
          executor.waitAndDrain();
1✔
162
        } catch (InterruptedException e) {
1✔
163
          interrupt = true;
1✔
164
          call.cancel("Thread interrupted", e);
1✔
165
          // Now wait for onClose() to be called, so interceptors can clean up
166
        }
1✔
167
      }
168
      executor.shutdown();
1✔
169
      return getUnchecked(responseFuture);
1✔
170
    } catch (RuntimeException e) {
1✔
171
      // Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
172
      throw cancelThrow(call, e);
×
173
    } catch (Error e) {
×
174
      // Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
175
      throw cancelThrow(call, e);
×
176
    } finally {
177
      if (interrupt) {
1✔
178
        Thread.currentThread().interrupt();
1✔
179
      }
180
    }
181
  }
182

183
  /**
184
   * Executes a server-streaming call returning a blocking {@link Iterator} over the
185
   * response stream.  The {@code call} should not be already started.  After calling this method,
186
   * {@code call} should no longer be used.
187
   *
188
   * <p>The returned iterator may throw {@link StatusRuntimeException} on error.
189
   *
190
   * @return an iterator over the response stream.
191
   */
192
  // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
193
  public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
194
      ClientCall<ReqT, RespT> call, ReqT req) {
195
    BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
1✔
196
    asyncUnaryRequestCall(call, req, result.listener());
1✔
197
    return result;
1✔
198
  }
199

200
  /**
201
   * Executes a server-streaming call returning a blocking {@link Iterator} over the
202
   * response stream.  The {@code call} should not be already started.  After calling this method,
203
   * {@code call} should no longer be used.
204
   *
205
   * <p>The returned iterator may throw {@link StatusRuntimeException} on error.
206
   *
207
   * @return an iterator over the response stream.
208
   */
209
  // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
210
  public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
211
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
212
    ThreadlessExecutor executor = new ThreadlessExecutor();
1✔
213
    ClientCall<ReqT, RespT> call = channel.newCall(method,
1✔
214
        callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
1✔
215
            .withExecutor(executor));
1✔
216
    BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
1✔
217
    asyncUnaryRequestCall(call, req, result.listener());
1✔
218
    return result;
1✔
219
  }
220

221
  /**
222
   * Executes a unary call and returns a {@link ListenableFuture} to the response.  The
223
   * {@code call} should not be already started.  After calling this method, {@code call} should no
224
   * longer be used.
225
   *
226
   * @return a future for the single response message.
227
   */
228
  public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
229
      ClientCall<ReqT, RespT> call, ReqT req) {
230
    GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call);
1✔
231
    asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture));
1✔
232
    return responseFuture;
1✔
233
  }
234

235
  /**
236
   * Returns the result of calling {@link Future#get()} interruptibly on a task known not to throw a
237
   * checked exception.
238
   *
239
   * <p>If interrupted, the interrupt is restored before throwing an exception..
240
   *
241
   * @throws java.util.concurrent.CancellationException
242
   *     if {@code get} throws a {@code CancellationException}.
243
   * @throws io.grpc.StatusRuntimeException if {@code get} throws an {@link ExecutionException}
244
   *     or an {@link InterruptedException}.
245
   */
246
  private static <V> V getUnchecked(Future<V> future) {
247
    try {
248
      return future.get();
1✔
249
    } catch (InterruptedException e) {
×
250
      Thread.currentThread().interrupt();
×
251
      throw Status.CANCELLED
×
252
          .withDescription("Thread interrupted")
×
253
          .withCause(e)
×
254
          .asRuntimeException();
×
255
    } catch (ExecutionException e) {
1✔
256
      throw toStatusRuntimeException(e.getCause());
1✔
257
    }
258
  }
259

260
  /**
261
   * Wraps the given {@link Throwable} in a {@link StatusRuntimeException}. If it contains an
262
   * embedded {@link StatusException} or {@link StatusRuntimeException}, the returned exception will
263
   * contain the embedded trailers and status, with the given exception as the cause. Otherwise, an
264
   * exception will be generated from an {@link Status#UNKNOWN} status.
265
   */
266
  private static StatusRuntimeException toStatusRuntimeException(Throwable t) {
267
    Throwable cause = checkNotNull(t, "t");
1✔
268
    while (cause != null) {
1✔
269
      // If we have an embedded status, use it and replace the cause
270
      if (cause instanceof StatusException) {
1✔
271
        StatusException se = (StatusException) cause;
×
272
        return new StatusRuntimeException(se.getStatus(), se.getTrailers());
×
273
      } else if (cause instanceof StatusRuntimeException) {
1✔
274
        StatusRuntimeException se = (StatusRuntimeException) cause;
1✔
275
        return new StatusRuntimeException(se.getStatus(), se.getTrailers());
1✔
276
      }
277
      cause = cause.getCause();
×
278
    }
279
    return Status.UNKNOWN.withDescription("unexpected exception").withCause(t)
×
280
        .asRuntimeException();
×
281
  }
282

283
  /**
284
   * Cancels a call, and throws the exception.
285
   *
286
   * @param t must be a RuntimeException or Error
287
   */
288
  private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
289
    try {
290
      call.cancel(null, t);
1✔
291
    } catch (Throwable e) {
×
292
      assert e instanceof RuntimeException || e instanceof Error;
×
293
      logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
×
294
    }
1✔
295
    if (t instanceof RuntimeException) {
1✔
296
      throw (RuntimeException) t;
1✔
297
    } else if (t instanceof Error) {
×
298
      throw (Error) t;
×
299
    }
300
    // should be impossible
301
    throw new AssertionError(t);
×
302
  }
303

304
  private static <ReqT, RespT> void asyncUnaryRequestCall(
305
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver,
306
      boolean streamingResponse) {
307
    asyncUnaryRequestCall(
1✔
308
        call,
309
        req,
310
        new StreamObserverToCallListenerAdapter<>(
311
            responseObserver,
312
            new CallToStreamObserverAdapter<>(call, streamingResponse)));
313
  }
1✔
314

315
  private static <ReqT, RespT> void asyncUnaryRequestCall(
316
      ClientCall<ReqT, RespT> call,
317
      ReqT req,
318
      StartableListener<RespT> responseListener) {
319
    startCall(call, responseListener);
1✔
320
    try {
321
      call.sendMessage(req);
1✔
322
      call.halfClose();
1✔
323
    } catch (RuntimeException e) {
×
324
      throw cancelThrow(call, e);
×
325
    } catch (Error e) {
×
326
      throw cancelThrow(call, e);
×
327
    }
1✔
328
  }
1✔
329

330
  private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
331
      ClientCall<ReqT, RespT> call,
332
      StreamObserver<RespT> responseObserver,
333
      boolean streamingResponse) {
334
    CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<>(
1✔
335
        call, streamingResponse);
336
    startCall(
1✔
337
        call,
338
        new StreamObserverToCallListenerAdapter<>(responseObserver, adapter));
339
    return adapter;
1✔
340
  }
341

342
  private static <ReqT, RespT> void startCall(
343
      ClientCall<ReqT, RespT> call,
344
      StartableListener<RespT> responseListener) {
345
    call.start(responseListener, new Metadata());
1✔
346
    responseListener.onStart();
1✔
347
  }
1✔
348

349
  private abstract static class StartableListener<T> extends ClientCall.Listener<T> {
350
    abstract void onStart();
351
  }
352

353
  private static final class CallToStreamObserverAdapter<ReqT>
354
      extends ClientCallStreamObserver<ReqT> {
355
    private boolean frozen;
356
    private final ClientCall<ReqT, ?> call;
357
    private final boolean streamingResponse;
358
    private Runnable onReadyHandler;
359
    private int initialRequest = 1;
1✔
360
    private boolean autoRequestEnabled = true;
1✔
361
    private boolean aborted = false;
1✔
362
    private boolean completed = false;
1✔
363

364
    // Non private to avoid synthetic class
365
    CallToStreamObserverAdapter(ClientCall<ReqT, ?> call, boolean streamingResponse) {
1✔
366
      this.call = call;
1✔
367
      this.streamingResponse = streamingResponse;
1✔
368
    }
1✔
369

370
    private void freeze() {
371
      this.frozen = true;
1✔
372
    }
1✔
373

374
    @Override
375
    public void onNext(ReqT value) {
376
      checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
1✔
377
      checkState(!completed, "Stream is already completed, no further calls are allowed");
1✔
378
      call.sendMessage(value);
1✔
379
    }
1✔
380

381
    @Override
382
    public void onError(Throwable t) {
383
      call.cancel("Cancelled by client with StreamObserver.onError()", t);
1✔
384
      aborted = true;
1✔
385
    }
1✔
386

387
    @Override
388
    public void onCompleted() {
389
      call.halfClose();
1✔
390
      completed = true;
1✔
391
    }
1✔
392

393
    @Override
394
    public boolean isReady() {
395
      return call.isReady();
1✔
396
    }
397

398
    @Override
399
    public void setOnReadyHandler(Runnable onReadyHandler) {
400
      if (frozen) {
1✔
401
        throw new IllegalStateException(
1✔
402
            "Cannot alter onReadyHandler after call started. Use ClientResponseObserver");
403
      }
404
      this.onReadyHandler = onReadyHandler;
1✔
405
    }
1✔
406

407
    @Override
408
    public void disableAutoInboundFlowControl() {
409
      disableAutoRequestWithInitial(1);
×
410
    }
×
411

412
    @Override
413
    public void disableAutoRequestWithInitial(int request) {
414
      if (frozen) {
1✔
415
        throw new IllegalStateException(
×
416
            "Cannot disable auto flow control after call started. Use ClientResponseObserver");
417
      }
418
      Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
1✔
419
      initialRequest = request;
1✔
420
      autoRequestEnabled = false;
1✔
421
    }
1✔
422

423
    @Override
424
    public void request(int count) {
425
      if (!streamingResponse && count == 1) {
1✔
426
        // Initially ask for two responses from flow-control so that if a misbehaving server
427
        // sends more than one responses, we can catch it and fail it in the listener.
428
        call.request(2);
1✔
429
      } else {
430
        call.request(count);
1✔
431
      }
432
    }
1✔
433

434
    @Override
435
    public void setMessageCompression(boolean enable) {
436
      call.setMessageCompression(enable);
×
437
    }
×
438

439
    @Override
440
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
441
      call.cancel(message, cause);
×
442
    }
×
443
  }
444

445
  private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
446
      extends StartableListener<RespT> {
447
    private final StreamObserver<RespT> observer;
448
    private final CallToStreamObserverAdapter<ReqT> adapter;
449
    private boolean firstResponseReceived;
450

451
    // Non private to avoid synthetic class
452
    StreamObserverToCallListenerAdapter(
453
        StreamObserver<RespT> observer,
454
        CallToStreamObserverAdapter<ReqT> adapter) {
1✔
455
      this.observer = observer;
1✔
456
      this.adapter = adapter;
1✔
457
      if (observer instanceof ClientResponseObserver) {
1✔
458
        @SuppressWarnings("unchecked")
459
        ClientResponseObserver<ReqT, RespT> clientResponseObserver =
1✔
460
            (ClientResponseObserver<ReqT, RespT>) observer;
461
        clientResponseObserver.beforeStart(adapter);
1✔
462
      }
463
      adapter.freeze();
1✔
464
    }
1✔
465

466
    @Override
467
    public void onHeaders(Metadata headers) {
468
    }
1✔
469

470
    @Override
471
    public void onMessage(RespT message) {
472
      if (firstResponseReceived && !adapter.streamingResponse) {
1✔
473
        throw Status.INTERNAL
×
474
            .withDescription("More than one responses received for unary or client-streaming call")
×
475
            .asRuntimeException();
×
476
      }
477
      firstResponseReceived = true;
1✔
478
      observer.onNext(message);
1✔
479

480
      if (adapter.streamingResponse && adapter.autoRequestEnabled) {
1✔
481
        // Request delivery of the next inbound message.
482
        adapter.request(1);
1✔
483
      }
484
    }
1✔
485

486
    @Override
487
    public void onClose(Status status, Metadata trailers) {
488
      if (status.isOk()) {
1✔
489
        observer.onCompleted();
1✔
490
      } else {
491
        observer.onError(status.asRuntimeException(trailers));
1✔
492
      }
493
    }
1✔
494

495
    @Override
496
    public void onReady() {
497
      if (adapter.onReadyHandler != null) {
1✔
498
        adapter.onReadyHandler.run();
1✔
499
      }
500
    }
1✔
501

502
    @Override
503
    void onStart() {
504
      if (adapter.initialRequest > 0) {
1✔
505
        adapter.request(adapter.initialRequest);
1✔
506
      }
507
    }
1✔
508
  }
509

510
  /**
511
   * Completes a {@link GrpcFuture} using {@link StreamObserver} events.
512
   */
513
  private static final class UnaryStreamToFuture<RespT> extends StartableListener<RespT> {
514
    private final GrpcFuture<RespT> responseFuture;
515
    private RespT value;
516
    private boolean isValueReceived = false;
1✔
517

518
    // Non private to avoid synthetic class
519
    UnaryStreamToFuture(GrpcFuture<RespT> responseFuture) {
1✔
520
      this.responseFuture = responseFuture;
1✔
521
    }
1✔
522

523
    @Override
524
    public void onHeaders(Metadata headers) {
525
    }
1✔
526

527
    @Override
528
    public void onMessage(RespT value) {
529
      if (this.isValueReceived) {
1✔
530
        throw Status.INTERNAL.withDescription("More than one value received for unary call")
×
531
            .asRuntimeException();
×
532
      }
533
      this.value = value;
1✔
534
      this.isValueReceived = true;
1✔
535
    }
1✔
536

537
    @Override
538
    public void onClose(Status status, Metadata trailers) {
539
      if (status.isOk()) {
1✔
540
        if (!isValueReceived) {
1✔
541
          // No value received so mark the future as an error
542
          responseFuture.setException(
×
543
              Status.INTERNAL.withDescription("No value received for unary call")
×
544
                  .asRuntimeException(trailers));
×
545
        }
546
        responseFuture.set(value);
1✔
547
      } else {
548
        responseFuture.setException(status.asRuntimeException(trailers));
1✔
549
      }
550
    }
1✔
551

552
    @Override
553
    void onStart() {
554
      responseFuture.call.request(2);
1✔
555
    }
1✔
556
  }
557

558
  private static final class GrpcFuture<RespT> extends AbstractFuture<RespT> {
559
    private final ClientCall<?, RespT> call;
560

561
    // Non private to avoid synthetic class
562
    GrpcFuture(ClientCall<?, RespT> call) {
1✔
563
      this.call = call;
1✔
564
    }
1✔
565

566
    @Override
567
    protected void interruptTask() {
568
      call.cancel("GrpcFuture was cancelled", null);
1✔
569
    }
1✔
570

571
    @Override
572
    protected boolean set(@Nullable RespT resp) {
573
      return super.set(resp);
1✔
574
    }
575

576
    @Override
577
    protected boolean setException(Throwable throwable) {
578
      return super.setException(throwable);
1✔
579
    }
580

581
    @SuppressWarnings("MissingOverride") // Add @Override once Java 6 support is dropped
582
    protected String pendingToString() {
583
      return MoreObjects.toStringHelper(this).add("clientCall", call).toString();
×
584
    }
585
  }
586

587
  /**
588
   * Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking {@link Iterator}.
589
   *
590
   * <p>The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a
591
   * separate thread from {@link Iterator} calls.
592
   */
593
  // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
594
  private static final class BlockingResponseStream<T> implements Iterator<T> {
595
    // Due to flow control, only needs to hold up to 3 items: 2 for value, 1 for close.
596
    // (2 for value, not 1, because of early request() in next())
597
    private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
1✔
598
    private final StartableListener<T> listener = new QueuingListener();
1✔
599
    private final ClientCall<?, T> call;
600
    /** May be null. */
601
    private final ThreadlessExecutor threadless;
602
    // Only accessed when iterating.
603
    private Object last;
604

605
    // Non private to avoid synthetic class
606
    BlockingResponseStream(ClientCall<?, T> call) {
607
      this(call, null);
1✔
608
    }
1✔
609

610
    // Non private to avoid synthetic class
611
    BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) {
1✔
612
      this.call = call;
1✔
613
      this.threadless = threadless;
1✔
614
    }
1✔
615

616
    StartableListener<T> listener() {
617
      return listener;
1✔
618
    }
619

620
    private Object waitForNext() {
621
      boolean interrupt = false;
1✔
622
      try {
623
        if (threadless == null) {
1✔
624
          while (true) {
625
            try {
626
              return buffer.take();
1✔
627
            } catch (InterruptedException ie) {
1✔
628
              interrupt = true;
1✔
629
              call.cancel("Thread interrupted", ie);
1✔
630
              // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
631
            }
1✔
632
          }
633
        } else {
634
          Object next;
635
          while ((next = buffer.poll()) == null) {
1✔
636
            try {
637
              threadless.waitAndDrain();
1✔
638
            } catch (InterruptedException ie) {
1✔
639
              interrupt = true;
1✔
640
              call.cancel("Thread interrupted", ie);
1✔
641
              // Now wait for onClose() to be called, so interceptors can clean up
642
            }
1✔
643
          }
644
          if (next == this || next instanceof StatusRuntimeException) {
1✔
645
            threadless.shutdown();
1✔
646
          }
647
          return next;
1✔
648
        }
649
      } finally {
650
        if (interrupt) {
1✔
651
          Thread.currentThread().interrupt();
1✔
652
        }
653
      }
654
    }
655

656
    @Override
657
    public boolean hasNext() {
658
      while (last == null) {
1✔
659
        // Will block here indefinitely waiting for content. RPC timeouts defend against permanent
660
        // hangs here as the call will become closed.
661
        last = waitForNext();
1✔
662
      }
663
      if (last instanceof StatusRuntimeException) {
1✔
664
        // Rethrow the exception with a new stacktrace.
665
        StatusRuntimeException e = (StatusRuntimeException) last;
1✔
666
        throw e.getStatus().asRuntimeException(e.getTrailers());
1✔
667
      }
668
      return last != this;
1✔
669
    }
670

671
    @Override
672
    public T next() {
673
      // Eagerly call request(1) so it can be processing the next message while we wait for the
674
      // current one, which reduces latency for the next message. With MigratingThreadDeframer and
675
      // if the data has already been recieved, every other message can be delivered instantly. This
676
      // can be run after hasNext(), but just would be slower.
677
      if (!(last instanceof StatusRuntimeException) && last != this) {
1✔
678
        call.request(1);
1✔
679
      }
680
      if (!hasNext()) {
1✔
681
        throw new NoSuchElementException();
×
682
      }
683
      @SuppressWarnings("unchecked")
684
      T tmp = (T) last;
1✔
685
      last = null;
1✔
686
      return tmp;
1✔
687
    }
688

689
    @Override
690
    public void remove() {
691
      throw new UnsupportedOperationException();
×
692
    }
693

694
    private final class QueuingListener extends StartableListener<T> {
695
      // Non private to avoid synthetic class
696
      QueuingListener() {}
1✔
697

698
      private boolean done = false;
1✔
699

700
      @Override
701
      public void onHeaders(Metadata headers) {
702
      }
1✔
703

704
      @Override
705
      public void onMessage(T value) {
706
        Preconditions.checkState(!done, "ClientCall already closed");
1✔
707
        buffer.add(value);
1✔
708
      }
1✔
709

710
      @Override
711
      public void onClose(Status status, Metadata trailers) {
712
        Preconditions.checkState(!done, "ClientCall already closed");
1✔
713
        if (status.isOk()) {
1✔
714
          buffer.add(BlockingResponseStream.this);
1✔
715
        } else {
716
          buffer.add(status.asRuntimeException(trailers));
1✔
717
        }
718
        done = true;
1✔
719
      }
1✔
720

721
      @Override
722
      void onStart() {
723
        call.request(1);
1✔
724
      }
1✔
725
    }
726
  }
727

728
  @SuppressWarnings("serial")
729
  private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
730
      implements Executor {
731
    private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());
1✔
732

733
    private static final Object SHUTDOWN = new Object(); // sentinel
1✔
734

735
    // Set to the calling thread while it's parked, SHUTDOWN on RPC completion
736
    private volatile Object waiter;
737

738
    // Non private to avoid synthetic class
739
    ThreadlessExecutor() {}
1✔
740

741
    /**
742
     * Waits until there is a Runnable, then executes it and all queued Runnables after it.
743
     * Must only be called by one thread at a time.
744
     */
745
    public void waitAndDrain() throws InterruptedException {
746
      throwIfInterrupted();
1✔
747
      Runnable runnable = poll();
1✔
748
      if (runnable == null) {
1✔
749
        waiter = Thread.currentThread();
1✔
750
        try {
751
          while ((runnable = poll()) == null) {
1✔
752
            LockSupport.park(this);
1✔
753
            throwIfInterrupted();
1✔
754
          }
755
        } finally {
756
          waiter = null;
1✔
757
        }
758
      }
759
      do {
760
        runQuietly(runnable);
1✔
761
      } while ((runnable = poll()) != null);
1✔
762
    }
1✔
763

764
    /**
765
     * Called after final call to {@link #waitAndDrain()}, from same thread.
766
     */
767
    public void shutdown() {
768
      waiter = SHUTDOWN;
1✔
769
      Runnable runnable;
770
      while ((runnable = poll()) != null) {
1✔
771
        runQuietly(runnable);
×
772
      }
773
    }
1✔
774

775
    private static void runQuietly(Runnable runnable) {
776
      try {
777
        runnable.run();
1✔
778
      } catch (Throwable t) {
×
779
        log.log(Level.WARNING, "Runnable threw exception", t);
×
780
      }
1✔
781
    }
1✔
782

783
    private static void throwIfInterrupted() throws InterruptedException {
784
      if (Thread.interrupted()) {
1✔
785
        throw new InterruptedException();
1✔
786
      }
787
    }
1✔
788

789
    @Override
790
    public void execute(Runnable runnable) {
791
      add(runnable);
1✔
792
      Object waiter = this.waiter;
1✔
793
      if (waiter != SHUTDOWN) {
1✔
794
        LockSupport.unpark((Thread) waiter); // no-op if null
1✔
795
      } else if (remove(runnable) && rejectRunnableOnExecutor) {
1✔
796
        throw new RejectedExecutionException();
1✔
797
      }
798
    }
1✔
799
  }
800

801
  enum StubType {
1✔
802
    BLOCKING, FUTURE, ASYNC
1✔
803
  }
804

805
  /**
806
   * Internal {@link CallOptions.Key} to indicate stub types.
807
   */
808
  static final CallOptions.Key<StubType> STUB_TYPE_OPTION =
1✔
809
      CallOptions.Key.create("internal-stub-type");
1✔
810
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc