• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19221

09 May 2024 05:48PM UTC coverage: 88.388% (-0.02%) from 88.408%
#19221

push

github

ejona86
xds: Add WRR metric test with real channel

31582 of 35731 relevant lines covered (88.39%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.64
/../stub/src/main/java/io/grpc/stub/ClientCalls.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.stub;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Strings;
26
import com.google.common.util.concurrent.AbstractFuture;
27
import com.google.common.util.concurrent.ListenableFuture;
28
import io.grpc.CallOptions;
29
import io.grpc.Channel;
30
import io.grpc.ClientCall;
31
import io.grpc.Metadata;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.StatusException;
35
import io.grpc.StatusRuntimeException;
36
import java.util.Iterator;
37
import java.util.NoSuchElementException;
38
import java.util.concurrent.ArrayBlockingQueue;
39
import java.util.concurrent.BlockingQueue;
40
import java.util.concurrent.ConcurrentLinkedQueue;
41
import java.util.concurrent.ExecutionException;
42
import java.util.concurrent.Executor;
43
import java.util.concurrent.Future;
44
import java.util.concurrent.RejectedExecutionException;
45
import java.util.concurrent.locks.LockSupport;
46
import java.util.logging.Level;
47
import java.util.logging.Logger;
48
import javax.annotation.Nullable;
49

50
/**
51
 * Utility functions for processing different call idioms. We have one-to-one correspondence
52
 * between utilities in this class and the potential signatures in a generated stub class so
53
 * that the runtime can vary behavior without requiring regeneration of the stub.
54
 */
55
public final class ClientCalls {
56

57
  private static final Logger logger = Logger.getLogger(ClientCalls.class.getName());
1✔
58

59
  @VisibleForTesting
60
  static boolean rejectRunnableOnExecutor =
1✔
61
      !Strings.isNullOrEmpty(System.getenv("GRPC_CLIENT_CALL_REJECT_RUNNABLE"))
1✔
62
          && Boolean.parseBoolean(System.getenv("GRPC_CLIENT_CALL_REJECT_RUNNABLE"));
1✔
63

64
  // Prevent instantiation
65
  private ClientCalls() {}
66

67
  /**
68
   * Executes a unary call with a response {@link StreamObserver}.  The {@code call} should not be
69
   * already started.  After calling this method, {@code call} should no longer be used.
70
   *
71
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
72
   * {@code beforeStart()} will be called.
73
   */
74
  public static <ReqT, RespT> void asyncUnaryCall(
75
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
76
    checkNotNull(responseObserver, "responseObserver");
1✔
77
    asyncUnaryRequestCall(call, req, responseObserver, false);
1✔
78
  }
1✔
79

80
  /**
81
   * Executes a server-streaming call with a response {@link StreamObserver}.  The {@code call}
82
   * should not be already started.  After calling this method, {@code call} should no longer be
83
   * used.
84
   *
85
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
86
   * {@code beforeStart()} will be called.
87
   */
88
  public static <ReqT, RespT> void asyncServerStreamingCall(
89
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
90
    checkNotNull(responseObserver, "responseObserver");
1✔
91
    asyncUnaryRequestCall(call, req, responseObserver, true);
1✔
92
  }
1✔
93

94
  /**
95
   * Executes a client-streaming call returning a {@link StreamObserver} for the request messages.
96
   * The {@code call} should not be already started.  After calling this method, {@code call}
97
   * should no longer be used.
98
   *
99
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
100
   * {@code beforeStart()} will be called.
101
   *
102
   * @return request stream observer. It will extend {@link ClientCallStreamObserver}
103
   */
104
  public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall(
105
      ClientCall<ReqT, RespT> call,
106
      StreamObserver<RespT> responseObserver) {
107
    checkNotNull(responseObserver, "responseObserver");
1✔
108
    return asyncStreamingRequestCall(call, responseObserver, false);
1✔
109
  }
110

111
  /**
112
   * Executes a bidirectional-streaming call.  The {@code call} should not be already started.
113
   * After calling this method, {@code call} should no longer be used.
114
   *
115
   * <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
116
   * {@code beforeStart()} will be called.
117
   *
118
   * @return request stream observer. It will extend {@link ClientCallStreamObserver}
119
   */
120
  public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall(
121
      ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
122
    checkNotNull(responseObserver, "responseObserver");
1✔
123
    return asyncStreamingRequestCall(call, responseObserver, true);
1✔
124
  }
125

126
  /**
127
   * Executes a unary call and blocks on the response.  The {@code call} should not be already
128
   * started.  After calling this method, {@code call} should no longer be used.
129
   *
130
   * @return the single response message.
131
   * @throws StatusRuntimeException on error
132
   */
133
  public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
134
    try {
135
      return getUnchecked(futureUnaryCall(call, req));
1✔
136
    } catch (RuntimeException | Error e) {
1✔
137
      throw cancelThrow(call, e);
×
138
    }
139
  }
140

141
  /**
142
   * Executes a unary call and blocks on the response.  The {@code call} should not be already
143
   * started.  After calling this method, {@code call} should no longer be used.
144
   *
145
   * @return the single response message.
146
   * @throws StatusRuntimeException on error
147
   */
148
  public static <ReqT, RespT> RespT blockingUnaryCall(
149
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
150
    ThreadlessExecutor executor = new ThreadlessExecutor();
1✔
151
    boolean interrupt = false;
1✔
152
    ClientCall<ReqT, RespT> call = channel.newCall(method,
1✔
153
        callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
1✔
154
            .withExecutor(executor));
1✔
155
    try {
156
      ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
1✔
157
      while (!responseFuture.isDone()) {
1✔
158
        try {
159
          executor.waitAndDrain();
1✔
160
        } catch (InterruptedException e) {
1✔
161
          interrupt = true;
1✔
162
          call.cancel("Thread interrupted", e);
1✔
163
          // Now wait for onClose() to be called, so interceptors can clean up
164
        }
1✔
165
      }
166
      executor.shutdown();
1✔
167
      return getUnchecked(responseFuture);
1✔
168
    } catch (RuntimeException | Error e) {
1✔
169
      // Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
170
      throw cancelThrow(call, e);
×
171
    } finally {
172
      if (interrupt) {
1✔
173
        Thread.currentThread().interrupt();
1✔
174
      }
175
    }
176
  }
177

178
  /**
179
   * Executes a server-streaming call returning a blocking {@link Iterator} over the
180
   * response stream.  The {@code call} should not be already started.  After calling this method,
181
   * {@code call} should no longer be used.
182
   *
183
   * <p>The returned iterator may throw {@link StatusRuntimeException} on error.
184
   *
185
   * @return an iterator over the response stream.
186
   */
187
  // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
188
  public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
189
      ClientCall<ReqT, RespT> call, ReqT req) {
190
    BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
1✔
191
    asyncUnaryRequestCall(call, req, result.listener());
1✔
192
    return result;
1✔
193
  }
194

195
  /**
196
   * Executes a server-streaming call returning a blocking {@link Iterator} over the
197
   * response stream.  The {@code call} should not be already started.  After calling this method,
198
   * {@code call} should no longer be used.
199
   *
200
   * <p>The returned iterator may throw {@link StatusRuntimeException} on error.
201
   *
202
   * @return an iterator over the response stream.
203
   */
204
  public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
205
      Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
206
    ClientCall<ReqT, RespT> call = channel.newCall(method,
1✔
207
        callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING));
1✔
208

209
    BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
1✔
210
    asyncUnaryRequestCall(call, req, result.listener());
1✔
211
    return result;
1✔
212
  }
213

214
  /**
215
   * Executes a unary call and returns a {@link ListenableFuture} to the response.  The
216
   * {@code call} should not be already started.  After calling this method, {@code call} should no
217
   * longer be used.
218
   *
219
   * @return a future for the single response message.
220
   */
221
  public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
222
      ClientCall<ReqT, RespT> call, ReqT req) {
223
    GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call);
1✔
224
    asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture));
1✔
225
    return responseFuture;
1✔
226
  }
227

228
  /**
229
   * Returns the result of calling {@link Future#get()} interruptibly on a task known not to throw a
230
   * checked exception.
231
   *
232
   * <p>If interrupted, the interrupt is restored before throwing an exception..
233
   *
234
   * @throws java.util.concurrent.CancellationException
235
   *     if {@code get} throws a {@code CancellationException}.
236
   * @throws io.grpc.StatusRuntimeException if {@code get} throws an {@link ExecutionException}
237
   *     or an {@link InterruptedException}.
238
   */
239
  private static <V> V getUnchecked(Future<V> future) {
240
    try {
241
      return future.get();
1✔
242
    } catch (InterruptedException e) {
×
243
      Thread.currentThread().interrupt();
×
244
      throw Status.CANCELLED
×
245
          .withDescription("Thread interrupted")
×
246
          .withCause(e)
×
247
          .asRuntimeException();
×
248
    } catch (ExecutionException e) {
1✔
249
      throw toStatusRuntimeException(e.getCause());
1✔
250
    }
251
  }
252

253
  /**
254
   * Wraps the given {@link Throwable} in a {@link StatusRuntimeException}. If it contains an
255
   * embedded {@link StatusException} or {@link StatusRuntimeException}, the returned exception will
256
   * contain the embedded trailers and status, with the given exception as the cause. Otherwise, an
257
   * exception will be generated from an {@link Status#UNKNOWN} status.
258
   */
259
  private static StatusRuntimeException toStatusRuntimeException(Throwable t) {
260
    Throwable cause = checkNotNull(t, "t");
1✔
261
    while (cause != null) {
1✔
262
      // If we have an embedded status, use it and replace the cause
263
      if (cause instanceof StatusException) {
1✔
264
        StatusException se = (StatusException) cause;
×
265
        return new StatusRuntimeException(se.getStatus(), se.getTrailers());
×
266
      } else if (cause instanceof StatusRuntimeException) {
1✔
267
        StatusRuntimeException se = (StatusRuntimeException) cause;
1✔
268
        return new StatusRuntimeException(se.getStatus(), se.getTrailers());
1✔
269
      }
270
      cause = cause.getCause();
×
271
    }
272
    return Status.UNKNOWN.withDescription("unexpected exception").withCause(t)
×
273
        .asRuntimeException();
×
274
  }
275

276
  /**
277
   * Cancels a call, and throws the exception.
278
   *
279
   * @param t must be a RuntimeException or Error
280
   */
281
  private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
282
    try {
283
      call.cancel(null, t);
1✔
284
    } catch (RuntimeException | Error e) {
×
285
      logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
×
286
    }
1✔
287
    if (t instanceof RuntimeException) {
1✔
288
      throw (RuntimeException) t;
1✔
289
    } else if (t instanceof Error) {
×
290
      throw (Error) t;
×
291
    }
292
    // should be impossible
293
    throw new AssertionError(t);
×
294
  }
295

296
  private static <ReqT, RespT> void asyncUnaryRequestCall(
297
      ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver,
298
      boolean streamingResponse) {
299
    asyncUnaryRequestCall(
1✔
300
        call,
301
        req,
302
        new StreamObserverToCallListenerAdapter<>(
303
            responseObserver,
304
            new CallToStreamObserverAdapter<>(call, streamingResponse)));
305
  }
1✔
306

307
  private static <ReqT, RespT> void asyncUnaryRequestCall(
308
      ClientCall<ReqT, RespT> call,
309
      ReqT req,
310
      StartableListener<RespT> responseListener) {
311
    startCall(call, responseListener);
1✔
312
    try {
313
      call.sendMessage(req);
1✔
314
      call.halfClose();
1✔
315
    } catch (RuntimeException | Error e) {
×
316
      throw cancelThrow(call, e);
×
317
    }
1✔
318
  }
1✔
319

320
  private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
321
      ClientCall<ReqT, RespT> call,
322
      StreamObserver<RespT> responseObserver,
323
      boolean streamingResponse) {
324
    CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<>(
1✔
325
        call, streamingResponse);
326
    startCall(
1✔
327
        call,
328
        new StreamObserverToCallListenerAdapter<>(responseObserver, adapter));
329
    return adapter;
1✔
330
  }
331

332
  private static <ReqT, RespT> void startCall(
333
      ClientCall<ReqT, RespT> call,
334
      StartableListener<RespT> responseListener) {
335
    call.start(responseListener, new Metadata());
1✔
336
    responseListener.onStart();
1✔
337
  }
1✔
338

339
  private abstract static class StartableListener<T> extends ClientCall.Listener<T> {
340
    abstract void onStart();
341
  }
342

343
  private static final class CallToStreamObserverAdapter<ReqT>
344
      extends ClientCallStreamObserver<ReqT> {
345
    private boolean frozen;
346
    private final ClientCall<ReqT, ?> call;
347
    private final boolean streamingResponse;
348
    private Runnable onReadyHandler;
349
    private int initialRequest = 1;
1✔
350
    private boolean autoRequestEnabled = true;
1✔
351
    private boolean aborted = false;
1✔
352
    private boolean completed = false;
1✔
353

354
    // Non private to avoid synthetic class
355
    CallToStreamObserverAdapter(ClientCall<ReqT, ?> call, boolean streamingResponse) {
1✔
356
      this.call = call;
1✔
357
      this.streamingResponse = streamingResponse;
1✔
358
    }
1✔
359

360
    private void freeze() {
361
      this.frozen = true;
1✔
362
    }
1✔
363

364
    @Override
365
    public void onNext(ReqT value) {
366
      checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
1✔
367
      checkState(!completed, "Stream is already completed, no further calls are allowed");
1✔
368
      call.sendMessage(value);
1✔
369
    }
1✔
370

371
    @Override
372
    public void onError(Throwable t) {
373
      call.cancel("Cancelled by client with StreamObserver.onError()", t);
1✔
374
      aborted = true;
1✔
375
    }
1✔
376

377
    @Override
378
    public void onCompleted() {
379
      call.halfClose();
1✔
380
      completed = true;
1✔
381
    }
1✔
382

383
    @Override
384
    public boolean isReady() {
385
      return call.isReady();
1✔
386
    }
387

388
    @Override
389
    public void setOnReadyHandler(Runnable onReadyHandler) {
390
      if (frozen) {
1✔
391
        throw new IllegalStateException(
1✔
392
            "Cannot alter onReadyHandler after call started. Use ClientResponseObserver");
393
      }
394
      this.onReadyHandler = onReadyHandler;
1✔
395
    }
1✔
396

397
    @Override
398
    public void disableAutoInboundFlowControl() {
399
      disableAutoRequestWithInitial(1);
×
400
    }
×
401

402
    @Override
403
    public void disableAutoRequestWithInitial(int request) {
404
      if (frozen) {
1✔
405
        throw new IllegalStateException(
×
406
            "Cannot disable auto flow control after call started. Use ClientResponseObserver");
407
      }
408
      Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
1✔
409
      initialRequest = request;
1✔
410
      autoRequestEnabled = false;
1✔
411
    }
1✔
412

413
    @Override
414
    public void request(int count) {
415
      if (!streamingResponse && count == 1) {
1✔
416
        // Initially ask for two responses from flow-control so that if a misbehaving server
417
        // sends more than one responses, we can catch it and fail it in the listener.
418
        call.request(2);
1✔
419
      } else {
420
        call.request(count);
1✔
421
      }
422
    }
1✔
423

424
    @Override
425
    public void setMessageCompression(boolean enable) {
426
      call.setMessageCompression(enable);
×
427
    }
×
428

429
    @Override
430
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
431
      call.cancel(message, cause);
×
432
    }
×
433
  }
434

435
  private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
436
      extends StartableListener<RespT> {
437
    private final StreamObserver<RespT> observer;
438
    private final CallToStreamObserverAdapter<ReqT> adapter;
439
    private boolean firstResponseReceived;
440

441
    // Non private to avoid synthetic class
442
    StreamObserverToCallListenerAdapter(
443
        StreamObserver<RespT> observer,
444
        CallToStreamObserverAdapter<ReqT> adapter) {
1✔
445
      this.observer = observer;
1✔
446
      this.adapter = adapter;
1✔
447
      if (observer instanceof ClientResponseObserver) {
1✔
448
        @SuppressWarnings("unchecked")
449
        ClientResponseObserver<ReqT, RespT> clientResponseObserver =
1✔
450
            (ClientResponseObserver<ReqT, RespT>) observer;
451
        clientResponseObserver.beforeStart(adapter);
1✔
452
      }
453
      adapter.freeze();
1✔
454
    }
1✔
455

456
    @Override
457
    public void onHeaders(Metadata headers) {
458
    }
1✔
459

460
    @Override
461
    public void onMessage(RespT message) {
462
      if (firstResponseReceived && !adapter.streamingResponse) {
1✔
463
        throw Status.INTERNAL
×
464
            .withDescription("More than one responses received for unary or client-streaming call")
×
465
            .asRuntimeException();
×
466
      }
467
      firstResponseReceived = true;
1✔
468
      observer.onNext(message);
1✔
469

470
      if (adapter.streamingResponse && adapter.autoRequestEnabled) {
1✔
471
        // Request delivery of the next inbound message.
472
        adapter.request(1);
1✔
473
      }
474
    }
1✔
475

476
    @Override
477
    public void onClose(Status status, Metadata trailers) {
478
      if (status.isOk()) {
1✔
479
        observer.onCompleted();
1✔
480
      } else {
481
        observer.onError(status.asRuntimeException(trailers));
1✔
482
      }
483
    }
1✔
484

485
    @Override
486
    public void onReady() {
487
      if (adapter.onReadyHandler != null) {
1✔
488
        adapter.onReadyHandler.run();
1✔
489
      }
490
    }
1✔
491

492
    @Override
493
    void onStart() {
494
      if (adapter.initialRequest > 0) {
1✔
495
        adapter.request(adapter.initialRequest);
1✔
496
      }
497
    }
1✔
498
  }
499

500
  /**
501
   * Completes a {@link GrpcFuture} using {@link StreamObserver} events.
502
   */
503
  private static final class UnaryStreamToFuture<RespT> extends StartableListener<RespT> {
504
    private final GrpcFuture<RespT> responseFuture;
505
    private RespT value;
506
    private boolean isValueReceived = false;
1✔
507

508
    // Non private to avoid synthetic class
509
    UnaryStreamToFuture(GrpcFuture<RespT> responseFuture) {
1✔
510
      this.responseFuture = responseFuture;
1✔
511
    }
1✔
512

513
    @Override
514
    public void onHeaders(Metadata headers) {
515
    }
1✔
516

517
    @Override
518
    public void onMessage(RespT value) {
519
      if (this.isValueReceived) {
1✔
520
        throw Status.INTERNAL.withDescription("More than one value received for unary call")
×
521
            .asRuntimeException();
×
522
      }
523
      this.value = value;
1✔
524
      this.isValueReceived = true;
1✔
525
    }
1✔
526

527
    @Override
528
    public void onClose(Status status, Metadata trailers) {
529
      if (status.isOk()) {
1✔
530
        if (!isValueReceived) {
1✔
531
          // No value received so mark the future as an error
532
          responseFuture.setException(
×
533
              Status.INTERNAL.withDescription("No value received for unary call")
×
534
                  .asRuntimeException(trailers));
×
535
        }
536
        responseFuture.set(value);
1✔
537
      } else {
538
        responseFuture.setException(status.asRuntimeException(trailers));
1✔
539
      }
540
    }
1✔
541

542
    @Override
543
    void onStart() {
544
      responseFuture.call.request(2);
1✔
545
    }
1✔
546
  }
547

548
  private static final class GrpcFuture<RespT> extends AbstractFuture<RespT> {
549
    private final ClientCall<?, RespT> call;
550

551
    // Non private to avoid synthetic class
552
    GrpcFuture(ClientCall<?, RespT> call) {
1✔
553
      this.call = call;
1✔
554
    }
1✔
555

556
    @Override
557
    protected void interruptTask() {
558
      call.cancel("GrpcFuture was cancelled", null);
1✔
559
    }
1✔
560

561
    @Override
562
    protected boolean set(@Nullable RespT resp) {
563
      return super.set(resp);
1✔
564
    }
565

566
    @Override
567
    protected boolean setException(Throwable throwable) {
568
      return super.setException(throwable);
1✔
569
    }
570

571
    @SuppressWarnings("MissingOverride") // Add @Override once Java 6 support is dropped
572
    protected String pendingToString() {
573
      return MoreObjects.toStringHelper(this).add("clientCall", call).toString();
×
574
    }
575
  }
576

577
  /**
578
   * Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking {@link Iterator}.
579
   *
580
   * <p>The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a
581
   * separate thread from {@link Iterator} calls.
582
   */
583
  // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
584
  private static final class BlockingResponseStream<T> implements Iterator<T> {
585
    // Due to flow control, only needs to hold up to 3 items: 2 for value, 1 for close.
586
    // (2 for value, not 1, because of early request() in next())
587
    private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
1✔
588
    private final StartableListener<T> listener = new QueuingListener();
1✔
589
    private final ClientCall<?, T> call;
590
    // Only accessed when iterating.
591
    private Object last;
592

593
    // Non private to avoid synthetic class
594
    BlockingResponseStream(ClientCall<?, T> call) {
1✔
595
      this.call = call;
1✔
596
    }
1✔
597

598
    StartableListener<T> listener() {
599
      return listener;
1✔
600
    }
601

602
    private Object waitForNext() {
603
      boolean interrupt = false;
1✔
604
      try {
605
        while (true) {
606
          try {
607
            return buffer.take();
1✔
608
          } catch (InterruptedException ie) {
1✔
609
            interrupt = true;
1✔
610
            call.cancel("Thread interrupted", ie);
1✔
611
            // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
612
          }
1✔
613
        }
614
      } finally {
615
        if (interrupt) {
1✔
616
          Thread.currentThread().interrupt();
1✔
617
        }
618
      }
619
    }
620

621
    @Override
622
    public boolean hasNext() {
623
      while (last == null) {
1✔
624
        // Will block here indefinitely waiting for content. RPC timeouts defend against permanent
625
        // hangs here as the call will become closed.
626
        last = waitForNext();
1✔
627
      }
628
      if (last instanceof StatusRuntimeException) {
1✔
629
        // Rethrow the exception with a new stacktrace.
630
        StatusRuntimeException e = (StatusRuntimeException) last;
1✔
631
        throw e.getStatus().asRuntimeException(e.getTrailers());
1✔
632
      }
633
      return last != this;
1✔
634
    }
635

636
    @Override
637
    public T next() {
638
      // Eagerly call request(1) so it can be processing the next message while we wait for the
639
      // current one, which reduces latency for the next message. With MigratingThreadDeframer and
640
      // if the data has already been recieved, every other message can be delivered instantly. This
641
      // can be run after hasNext(), but just would be slower.
642
      if (!(last instanceof StatusRuntimeException) && last != this) {
1✔
643
        call.request(1);
1✔
644
      }
645
      if (!hasNext()) {
1✔
646
        throw new NoSuchElementException();
×
647
      }
648
      @SuppressWarnings("unchecked")
649
      T tmp = (T) last;
1✔
650
      last = null;
1✔
651
      return tmp;
1✔
652
    }
653

654
    @Override
655
    public void remove() {
656
      throw new UnsupportedOperationException();
×
657
    }
658

659
    private final class QueuingListener extends StartableListener<T> {
660
      // Non private to avoid synthetic class
661
      QueuingListener() {}
1✔
662

663
      private boolean done = false;
1✔
664

665
      @Override
666
      public void onHeaders(Metadata headers) {
667
      }
1✔
668

669
      @Override
670
      public void onMessage(T value) {
671
        Preconditions.checkState(!done, "ClientCall already closed");
1✔
672
        buffer.add(value);
1✔
673
      }
1✔
674

675
      @Override
676
      public void onClose(Status status, Metadata trailers) {
677
        Preconditions.checkState(!done, "ClientCall already closed");
1✔
678
        if (status.isOk()) {
1✔
679
          buffer.add(BlockingResponseStream.this);
1✔
680
        } else {
681
          buffer.add(status.asRuntimeException(trailers));
1✔
682
        }
683
        done = true;
1✔
684
      }
1✔
685

686
      @Override
687
      void onStart() {
688
        call.request(1);
1✔
689
      }
1✔
690
    }
691
  }
692

693
  @SuppressWarnings("serial")
694
  private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
695
      implements Executor {
696
    private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());
1✔
697

698
    private static final Object SHUTDOWN = new Object(); // sentinel
1✔
699

700
    // Set to the calling thread while it's parked, SHUTDOWN on RPC completion
701
    private volatile Object waiter;
702

703
    // Non private to avoid synthetic class
704
    ThreadlessExecutor() {}
1✔
705

706
    /**
707
     * Waits until there is a Runnable, then executes it and all queued Runnables after it.
708
     * Must only be called by one thread at a time.
709
     */
710
    public void waitAndDrain() throws InterruptedException {
711
      throwIfInterrupted();
1✔
712
      Runnable runnable = poll();
1✔
713
      if (runnable == null) {
1✔
714
        waiter = Thread.currentThread();
1✔
715
        try {
716
          while ((runnable = poll()) == null) {
1✔
717
            LockSupport.park(this);
1✔
718
            throwIfInterrupted();
1✔
719
          }
720
        } finally {
721
          waiter = null;
1✔
722
        }
723
      }
724
      do {
725
        runQuietly(runnable);
1✔
726
      } while ((runnable = poll()) != null);
1✔
727
    }
1✔
728

729
    /**
730
     * Called after final call to {@link #waitAndDrain()}, from same thread.
731
     */
732
    public void shutdown() {
733
      waiter = SHUTDOWN;
1✔
734
      Runnable runnable;
735
      while ((runnable = poll()) != null) {
1✔
736
        runQuietly(runnable);
×
737
      }
738
    }
1✔
739

740
    private static void runQuietly(Runnable runnable) {
741
      try {
742
        runnable.run();
1✔
743
      } catch (Throwable t) {
×
744
        log.log(Level.WARNING, "Runnable threw exception", t);
×
745
      }
1✔
746
    }
1✔
747

748
    private static void throwIfInterrupted() throws InterruptedException {
749
      if (Thread.interrupted()) {
1✔
750
        throw new InterruptedException();
1✔
751
      }
752
    }
1✔
753

754
    @Override
755
    public void execute(Runnable runnable) {
756
      add(runnable);
1✔
757
      Object waiter = this.waiter;
1✔
758
      if (waiter != SHUTDOWN) {
1✔
759
        LockSupport.unpark((Thread) waiter); // no-op if null
1✔
760
      } else if (remove(runnable) && rejectRunnableOnExecutor) {
1✔
761
        throw new RejectedExecutionException();
1✔
762
      }
763
    }
1✔
764
  }
765

766
  enum StubType {
1✔
767
    BLOCKING, FUTURE, ASYNC
1✔
768
  }
769

770
  /**
771
   * Internal {@link CallOptions.Key} to indicate stub types.
772
   */
773
  static final CallOptions.Key<StubType> STUB_TYPE_OPTION =
1✔
774
      CallOptions.Key.create("internal-stub-type");
1✔
775
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc