• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19930

29 Jul 2025 04:33PM UTC coverage: 88.588% (+0.007%) from 88.581%
#19930

push

github

web-flow
stub: simplify BlockingClientCall infinite blocking (#12217)

Move deadline computation into overloads with finite timeouts. Blocking calls without timeouts now do not have to read the clock.

34654 of 39118 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.21
/../stub/src/main/java/io/grpc/stub/BlockingClientCall.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.stub;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.base.Preconditions;
21
import com.google.common.base.Predicate;
22
import io.grpc.ClientCall;
23
import io.grpc.ExperimentalApi;
24
import io.grpc.Metadata;
25
import io.grpc.Status;
26
import io.grpc.StatusException;
27
import io.grpc.stub.ClientCalls.ThreadSafeThreadlessExecutor;
28
import java.util.concurrent.ArrayBlockingQueue;
29
import java.util.concurrent.BlockingQueue;
30
import java.util.concurrent.TimeUnit;
31
import java.util.concurrent.TimeoutException;
32
import java.util.logging.Level;
33
import java.util.logging.Logger;
34

35
/**
36
 * Represents a bidirectional streaming call from a client.  Allows in a blocking manner, sending
37
 * over the stream and receiving from the stream.  Also supports terminating the call.
38
 * Wraps a ClientCall and converts from async communication to the sync paradigm used by the
39
 * various blocking stream methods in {@link ClientCalls} which are used by the generated stubs.
40
 *
41
 * <p>Supports separate threads for reads and writes, but only 1 of each
42
 *
43
 * <p>Read methods consist of:
44
 * <ul>
45
 *   <li>{@link #read()}
46
 *   <li>{@link #read(long timeout, TimeUnit unit)}
47
 *   <li>{@link #hasNext()}
48
 *   <li>{@link #cancel(String, Throwable)}
49
 * </ul>
50
 *
51
 * <p>Write methods consist of:
52
 * <ul>
53
 *   <li>{@link #write(Object)}
54
 *   <li>{@link #write(Object, long timeout, TimeUnit unit)}
55
 *   <li>{@link #halfClose()}
56
 * </ul>
57
 *
58
 * @param <ReqT> Type of the Request Message
59
 * @param <RespT> Type of the Response Message
60
 */
61
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
62
public final class BlockingClientCall<ReqT, RespT> {
63

64
  private static final Logger logger = Logger.getLogger(BlockingClientCall.class.getName());
1✔
65

66
  private final BlockingQueue<RespT> buffer;
67
  private final ClientCall<ReqT, RespT> call;
68

69
  private final ThreadSafeThreadlessExecutor executor;
70

71
  private boolean writeClosed;
72
  private volatile Status closedStatus; // null if not closed
73

74
  BlockingClientCall(ClientCall<ReqT, RespT> call, ThreadSafeThreadlessExecutor executor) {
1✔
75
    this.call = call;
1✔
76
    this.executor = executor;
1✔
77
    buffer = new ArrayBlockingQueue<>(1);
1✔
78
  }
1✔
79

80
  /**
81
   * Wait if necessary for a value to be available from the server. If there is an available value
82
   * return it immediately, if the stream is closed return a null. Otherwise, wait for a value to be
83
   * available or the stream to be closed
84
   *
85
   * @return value from server or null if stream has been closed
86
   * @throws StatusException If the stream has closed in an error state
87
   */
88
  public RespT read() throws InterruptedException, StatusException {
89
    try {
90
      return read(true, 0);
1✔
91
    } catch (TimeoutException e) {
×
92
      throw new AssertionError("should never happen", e);
×
93
    }
94
  }
95

96
  /**
97
   * Wait with timeout, if necessary, for a value to be available from the server. If there is an
98
   * available value, return it immediately.  If the stream is closed return a null. Otherwise, wait
99
   * for a value to be available, the stream to be closed or the timeout to expire.
100
   *
101
   * @param timeout how long to wait before giving up.  Values &lt;= 0 are no wait
102
   * @param unit a TimeUnit determining how to interpret the timeout parameter
103
   * @return value from server or null (if stream has been closed)
104
   * @throws TimeoutException if no read becomes ready before the specified timeout expires
105
   * @throws StatusException If the stream has closed in an error state
106
   */
107
  public RespT read(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
108
      StatusException {
109
    long endNanoTime = System.nanoTime() + unit.toNanos(timeout);
1✔
110
    return read(false, endNanoTime);
1✔
111
  }
112

113
  private RespT read(boolean waitForever, long endNanoTime)
114
      throws InterruptedException, TimeoutException, StatusException {
115
    Predicate<BlockingClientCall<ReqT, RespT>> predicate = BlockingClientCall::skipWaitingForRead;
1✔
116
    executor.waitAndDrainWithTimeout(waitForever, endNanoTime, predicate, this);
1✔
117
    RespT bufferedValue = buffer.poll();
1✔
118

119
    if (logger.isLoggable(Level.FINER)) {
1✔
120
      logger.finer("Client Blocking read had value:  " + bufferedValue);
×
121
    }
122

123
    Status currentClosedStatus;
124
    if (bufferedValue != null) {
1✔
125
      call.request(1);
1✔
126
      return bufferedValue;
1✔
127
    } else if ((currentClosedStatus = closedStatus) == null) {
1✔
128
      throw new IllegalStateException(
×
129
          "The message disappeared... are you reading from multiple threads?");
130
    } else if (!currentClosedStatus.isOk()) {
1✔
131
      throw currentClosedStatus.asException();
1✔
132
    } else {
133
      return null;
1✔
134
    }
135
  }
136

137
  boolean skipWaitingForRead() {
138
    return closedStatus != null || !buffer.isEmpty();
1✔
139
  }
140

141
  /**
142
   * Wait for a value to be available from the server. If there is an
143
   * available value, return true immediately.  If the stream was closed with Status.OK, return
144
   * false.  If the stream was closed with an error status, throw a StatusException. Otherwise, wait
145
   * for a value to be available or the stream to be closed.
146
   *
147
   * @return True when there is a value to read.  Return false if stream closed cleanly.
148
   * @throws StatusException If the stream was closed in an error state
149
   */
150
  public boolean hasNext() throws InterruptedException, StatusException {
151
    executor.waitAndDrain((x) -> !x.buffer.isEmpty() || x.closedStatus != null, this);
×
152

153
    Status currentClosedStatus = closedStatus;
×
154
    if (currentClosedStatus != null && !currentClosedStatus.isOk()) {
×
155
      throw currentClosedStatus.asException();
×
156
    }
157

158
    return !buffer.isEmpty();
×
159
  }
160

161
  /**
162
   * Send a value to the stream for sending to server, wait if necessary for the grpc stream to be
163
   * ready.
164
   *
165
   * <p>If write is not legal at the time of call, immediately returns false
166
   *
167
   * <p><br><b>NOTE: </b>This method will return as soon as it passes the request to the grpc stream
168
   * layer. It will not block while the message is being sent on the wire and returning true does
169
   * not guarantee that the server gets the message.
170
   *
171
   * <p><br><b>WARNING: </b>Doing only writes without reads can lead to deadlocks.  This is because
172
   * flow control, imposed by networks to protect intermediary routers and endpoints that are
173
   * operating under resource constraints, requires reads to be done in order to progress writes.
174
   * Furthermore, the server closing the stream will only be identified after
175
   * the last sent value is read.
176
   *
177
   * @param request Message to send to the server
178
   * @return true if the request is sent to stream, false if skipped
179
   * @throws StatusException If the stream has closed in an error state
180
   */
181
  public boolean write(ReqT request) throws InterruptedException, StatusException {
182
    try {
183
      return write(true, request, 0);
1✔
184
    } catch (TimeoutException e) {
×
185
      throw new RuntimeException(e); // should never happen
×
186
    }
187
  }
188

189
  /**
190
   * Send a value to the stream for sending to server, wait if necessary for the grpc stream to be
191
   * ready up to specified timeout.
192
   *
193
   * <p>If write is not legal at the time of call, immediately returns false
194
   *
195
   * <p><br><b>NOTE: </b>This method will return as soon as it passes the request to the grpc stream
196
   * layer. It will not block while the message is being sent on the wire and returning true does
197
   * not guarantee that the server gets the message.
198
   *
199
   * <p><br><b>WARNING: </b>Doing only writes without reads can lead to deadlocks as a result of
200
   * flow control.  Furthermore, the server closing the stream will only be identified after the
201
   * last sent value is read.
202
   *
203
   * @param request Message to send to the server
204
   * @param timeout How long to wait before giving up.  Values &lt;= 0 are no wait
205
   * @param unit A TimeUnit determining how to interpret the timeout parameter
206
   * @return true if the request is sent to stream, false if skipped
207
   * @throws TimeoutException if write does not become ready before the specified timeout expires
208
   * @throws StatusException If the stream has closed in an error state
209
   */
210
  public boolean write(ReqT request, long timeout, TimeUnit unit)
211
      throws InterruptedException, TimeoutException, StatusException {
212
    long endNanoTime = System.nanoTime() + unit.toNanos(timeout);
1✔
213
    return write(false, request, endNanoTime);
1✔
214
  }
215

216
  private boolean write(boolean waitForever, ReqT request, long endNanoTime)
217
      throws InterruptedException, TimeoutException, StatusException {
218

219
    if (writeClosed) {
1✔
220
      throw new IllegalStateException("Writes cannot be done after calling halfClose or cancel");
1✔
221
    }
222

223
    Predicate<BlockingClientCall<ReqT, RespT>> predicate =
1✔
224
        (x) -> x.call.isReady() || x.closedStatus != null;
1✔
225
    executor.waitAndDrainWithTimeout(waitForever, endNanoTime, predicate, this);
1✔
226
    Status savedClosedStatus = closedStatus;
1✔
227
    if (savedClosedStatus == null) {
1✔
228
      call.sendMessage(request);
1✔
229
      return true;
1✔
230
    } else if (savedClosedStatus.isOk()) {
1✔
231
      return false;
×
232
    } else {
233
      // Propagate any errors returned from the server
234
      throw savedClosedStatus.asException();
1✔
235
    }
236
  }
237

238
  void sendSingleRequest(ReqT request) {
239
    call.sendMessage(request);
×
240
  }
×
241

242
  /**
243
   * Cancel stream and stop any further writes.  Note that some reads that are in flight may still
244
   * happen after the cancel.
245
   *
246
   * @param message if not {@code null}, will appear as the description of the CANCELLED status
247
   * @param cause if not {@code null}, will appear as the cause of the CANCELLED status
248
   */
249
  public void cancel(String message, Throwable cause) {
250
    writeClosed = true;
1✔
251
    call.cancel(message, cause);
1✔
252
  }
1✔
253

254
  /**
255
   * Indicate that no more writes will be done and the stream will be closed from the client side.
256
   *
257
   * @see ClientCall#halfClose()
258
   */
259
  public void halfClose() {
260
    if (writeClosed) {
1✔
261
      throw new IllegalStateException(
×
262
          "halfClose cannot be called after already half closed or cancelled");
263
    }
264

265
    writeClosed = true;
1✔
266
    call.halfClose();
1✔
267
  }
1✔
268

269
  /**
270
   * Status that server sent when closing channel from its side.
271
   *
272
   * @return null if stream not closed by server, otherwise Status sent by server
273
   */
274
  @VisibleForTesting
275
  Status getClosedStatus() {
276
    drainQuietly();
1✔
277
    return closedStatus;
1✔
278
  }
279

280
  /**
281
   * Check for whether some action is ready.
282
   *
283
   * @return True if legal to write and writeOrRead can run without blocking
284
   */
285
  @VisibleForTesting
286
  boolean isEitherReadOrWriteReady() {
287
    return (isWriteLegal() && isWriteReady()) || isReadReady();
1✔
288
  }
289

290
  /**
291
   * Check whether there are any values waiting to be read.
292
   *
293
   * @return true if read will not block
294
   */
295
  @VisibleForTesting
296
  boolean isReadReady() {
297
    drainQuietly();
1✔
298

299
    return !buffer.isEmpty();
1✔
300
  }
301

302
  /**
303
   * Check that write hasn't been marked complete and stream is ready to receive a write (so will
304
   * not block).
305
   *
306
   * @return true if legal to write and write will not block
307
   */
308
  @VisibleForTesting
309
  boolean isWriteReady() {
310
    drainQuietly();
1✔
311

312
    return isWriteLegal() && call.isReady();
1✔
313
  }
314

315
  /**
316
   * Check whether we'll ever be able to do writes or should terminate.
317
   * @return True if writes haven't been closed and the server hasn't closed the stream
318
   */
319
  private boolean isWriteLegal() {
320
    return !writeClosed && closedStatus == null;
1✔
321
  }
322

323
  ClientCall.Listener<RespT> getListener() {
324
    return new QueuingListener();
1✔
325
  }
326

327
  private void drainQuietly() {
328
    try {
329
      executor.drain();
1✔
330
    } catch (InterruptedException e) {
×
331
      Thread.currentThread().interrupt();
×
332
    }
1✔
333
  }
1✔
334

335
  private final class QueuingListener extends ClientCall.Listener<RespT> {
1✔
336
    @Override
337
    public void onMessage(RespT value) {
338
      Preconditions.checkState(closedStatus == null, "ClientCall already closed");
1✔
339
      buffer.add(value);
1✔
340
    }
1✔
341

342
    @Override
343
    public void onClose(Status status, Metadata trailers) {
344
      Preconditions.checkState(closedStatus == null, "ClientCall already closed");
1✔
345
      closedStatus = status;
1✔
346
    }
1✔
347
  }
348

349
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc