• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19937

06 Aug 2025 06:54AM UTC coverage: 88.597% (-0.001%) from 88.598%
#19937

push

github

web-flow
stub: use the closedTrailers in StatusException (#12259)

34676 of 39139 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/../stub/src/main/java/io/grpc/stub/BlockingClientCall.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.stub;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.base.Preconditions;
21
import com.google.common.base.Predicate;
22
import io.grpc.ClientCall;
23
import io.grpc.ExperimentalApi;
24
import io.grpc.Metadata;
25
import io.grpc.Status;
26
import io.grpc.StatusException;
27
import io.grpc.stub.ClientCalls.ThreadSafeThreadlessExecutor;
28
import java.util.concurrent.ArrayBlockingQueue;
29
import java.util.concurrent.BlockingQueue;
30
import java.util.concurrent.TimeUnit;
31
import java.util.concurrent.TimeoutException;
32
import java.util.concurrent.atomic.AtomicReference;
33
import java.util.logging.Level;
34
import java.util.logging.Logger;
35

36
/**
37
 * Represents a bidirectional streaming call from a client.  Allows in a blocking manner, sending
38
 * over the stream and receiving from the stream.  Also supports terminating the call.
39
 * Wraps a ClientCall and converts from async communication to the sync paradigm used by the
40
 * various blocking stream methods in {@link ClientCalls} which are used by the generated stubs.
41
 *
42
 * <p>Supports separate threads for reads and writes, but only 1 of each
43
 *
44
 * <p>Read methods consist of:
45
 * <ul>
46
 *   <li>{@link #read()}
47
 *   <li>{@link #read(long timeout, TimeUnit unit)}
48
 *   <li>{@link #hasNext()}
49
 *   <li>{@link #cancel(String, Throwable)}
50
 * </ul>
51
 *
52
 * <p>Write methods consist of:
53
 * <ul>
54
 *   <li>{@link #write(Object)}
55
 *   <li>{@link #write(Object, long timeout, TimeUnit unit)}
56
 *   <li>{@link #halfClose()}
57
 * </ul>
58
 *
59
 * @param <ReqT> Type of the Request Message
60
 * @param <RespT> Type of the Response Message
61
 */
62
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
63
public final class BlockingClientCall<ReqT, RespT> {
64

65
  private static final Logger logger = Logger.getLogger(BlockingClientCall.class.getName());
1✔
66

67
  private final BlockingQueue<RespT> buffer;
68
  private final ClientCall<ReqT, RespT> call;
69

70
  private final ThreadSafeThreadlessExecutor executor;
71

72
  private boolean writeClosed;
73
  private AtomicReference<CloseState> closeState = new AtomicReference<>();
1✔
74

75
  BlockingClientCall(ClientCall<ReqT, RespT> call, ThreadSafeThreadlessExecutor executor) {
1✔
76
    this.call = call;
1✔
77
    this.executor = executor;
1✔
78
    buffer = new ArrayBlockingQueue<>(1);
1✔
79
  }
1✔
80

81
  /**
82
   * Wait if necessary for a value to be available from the server. If there is an available value
83
   * return it immediately, if the stream is closed return a null. Otherwise, wait for a value to be
84
   * available or the stream to be closed
85
   *
86
   * @return value from server or null if stream has been closed
87
   * @throws StatusException If the stream has closed in an error state
88
   */
89
  public RespT read() throws InterruptedException, StatusException {
90
    try {
91
      return read(true, 0);
1✔
92
    } catch (TimeoutException e) {
×
93
      throw new AssertionError("should never happen", e);
×
94
    }
95
  }
96

97
  /**
98
   * Wait with timeout, if necessary, for a value to be available from the server. If there is an
99
   * available value, return it immediately.  If the stream is closed return a null. Otherwise, wait
100
   * for a value to be available, the stream to be closed or the timeout to expire.
101
   *
102
   * @param timeout how long to wait before giving up.  Values &lt;= 0 are no wait
103
   * @param unit a TimeUnit determining how to interpret the timeout parameter
104
   * @return value from server or null (if stream has been closed)
105
   * @throws TimeoutException if no read becomes ready before the specified timeout expires
106
   * @throws StatusException If the stream has closed in an error state
107
   */
108
  public RespT read(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
109
      StatusException {
110
    long endNanoTime = System.nanoTime() + unit.toNanos(timeout);
1✔
111
    return read(false, endNanoTime);
1✔
112
  }
113

114
  private RespT read(boolean waitForever, long endNanoTime)
115
      throws InterruptedException, TimeoutException, StatusException {
116
    Predicate<BlockingClientCall<ReqT, RespT>> predicate = BlockingClientCall::skipWaitingForRead;
1✔
117
    executor.waitAndDrainWithTimeout(waitForever, endNanoTime, predicate, this);
1✔
118
    RespT bufferedValue = buffer.poll();
1✔
119

120
    if (logger.isLoggable(Level.FINER)) {
1✔
121
      logger.finer("Client Blocking read had value:  " + bufferedValue);
×
122
    }
123

124
    CloseState currentCloseState;
125
    if (bufferedValue != null) {
1✔
126
      call.request(1);
1✔
127
      return bufferedValue;
1✔
128
    } else if ((currentCloseState = closeState.get()) == null) {
1✔
129
      throw new IllegalStateException(
×
130
          "The message disappeared... are you reading from multiple threads?");
131
    } else if (!currentCloseState.status.isOk()) {
1✔
132
      throw currentCloseState.status.asException(currentCloseState.trailers);
1✔
133
    } else {
134
      return null;
1✔
135
    }
136
  }
137

138
  boolean skipWaitingForRead() {
139
    return closeState.get() != null || !buffer.isEmpty();
1✔
140
  }
141

142
  /**
143
   * Wait for a value to be available from the server. If there is an
144
   * available value, return true immediately.  If the stream was closed with Status.OK, return
145
   * false.  If the stream was closed with an error status, throw a StatusException. Otherwise, wait
146
   * for a value to be available or the stream to be closed.
147
   *
148
   * @return True when there is a value to read.  Return false if stream closed cleanly.
149
   * @throws StatusException If the stream was closed in an error state
150
   */
151
  public boolean hasNext() throws InterruptedException, StatusException {
152
    executor.waitAndDrain((x) -> !x.buffer.isEmpty() || x.closeState.get() != null, this);
×
153

154
    CloseState currentCloseState = closeState.get();
×
155
    if (currentCloseState != null && !currentCloseState.status.isOk()) {
×
156
      throw currentCloseState.status.asException(currentCloseState.trailers);
×
157
    }
158

159
    return !buffer.isEmpty();
×
160
  }
161

162
  /**
163
   * Send a value to the stream for sending to server, wait if necessary for the grpc stream to be
164
   * ready.
165
   *
166
   * <p>If write is not legal at the time of call, immediately returns false
167
   *
168
   * <p><br><b>NOTE: </b>This method will return as soon as it passes the request to the grpc stream
169
   * layer. It will not block while the message is being sent on the wire and returning true does
170
   * not guarantee that the server gets the message.
171
   *
172
   * <p><br><b>WARNING: </b>Doing only writes without reads can lead to deadlocks.  This is because
173
   * flow control, imposed by networks to protect intermediary routers and endpoints that are
174
   * operating under resource constraints, requires reads to be done in order to progress writes.
175
   * Furthermore, the server closing the stream will only be identified after
176
   * the last sent value is read.
177
   *
178
   * @param request Message to send to the server
179
   * @return true if the request is sent to stream, false if skipped
180
   * @throws StatusException If the stream has closed in an error state
181
   */
182
  public boolean write(ReqT request) throws InterruptedException, StatusException {
183
    try {
184
      return write(true, request, 0);
1✔
185
    } catch (TimeoutException e) {
×
186
      throw new RuntimeException(e); // should never happen
×
187
    }
188
  }
189

190
  /**
191
   * Send a value to the stream for sending to server, wait if necessary for the grpc stream to be
192
   * ready up to specified timeout.
193
   *
194
   * <p>If write is not legal at the time of call, immediately returns false
195
   *
196
   * <p><br><b>NOTE: </b>This method will return as soon as it passes the request to the grpc stream
197
   * layer. It will not block while the message is being sent on the wire and returning true does
198
   * not guarantee that the server gets the message.
199
   *
200
   * <p><br><b>WARNING: </b>Doing only writes without reads can lead to deadlocks as a result of
201
   * flow control.  Furthermore, the server closing the stream will only be identified after the
202
   * last sent value is read.
203
   *
204
   * @param request Message to send to the server
205
   * @param timeout How long to wait before giving up.  Values &lt;= 0 are no wait
206
   * @param unit A TimeUnit determining how to interpret the timeout parameter
207
   * @return true if the request is sent to stream, false if skipped
208
   * @throws TimeoutException if write does not become ready before the specified timeout expires
209
   * @throws StatusException If the stream has closed in an error state
210
   */
211
  public boolean write(ReqT request, long timeout, TimeUnit unit)
212
      throws InterruptedException, TimeoutException, StatusException {
213
    long endNanoTime = System.nanoTime() + unit.toNanos(timeout);
1✔
214
    return write(false, request, endNanoTime);
1✔
215
  }
216

217
  private boolean write(boolean waitForever, ReqT request, long endNanoTime)
218
      throws InterruptedException, TimeoutException, StatusException {
219

220
    if (writeClosed) {
1✔
221
      throw new IllegalStateException("Writes cannot be done after calling halfClose or cancel");
1✔
222
    }
223

224
    Predicate<BlockingClientCall<ReqT, RespT>> predicate =
1✔
225
        (x) -> x.call.isReady() || x.closeState.get() != null;
1✔
226
    executor.waitAndDrainWithTimeout(waitForever, endNanoTime, predicate, this);
1✔
227
    CloseState savedCloseState = closeState.get();
1✔
228
    if (savedCloseState == null || savedCloseState.status == null) {
1✔
229
      call.sendMessage(request);
1✔
230
      return true;
1✔
231
    } else if (savedCloseState.status.isOk()) {
1✔
232
      return false;
×
233
    } else {
234
      throw savedCloseState.status.asException(savedCloseState.trailers);
1✔
235
    }
236
  }
237

238
  void sendSingleRequest(ReqT request) {
239
    call.sendMessage(request);
×
240
  }
×
241

242
  /**
243
   * Cancel stream and stop any further writes.  Note that some reads that are in flight may still
244
   * happen after the cancel.
245
   *
246
   * @param message if not {@code null}, will appear as the description of the CANCELLED status
247
   * @param cause if not {@code null}, will appear as the cause of the CANCELLED status
248
   */
249
  public void cancel(String message, Throwable cause) {
250
    writeClosed = true;
1✔
251
    call.cancel(message, cause);
1✔
252
  }
1✔
253

254
  /**
255
   * Indicate that no more writes will be done and the stream will be closed from the client side.
256
   *
257
   * @see ClientCall#halfClose()
258
   */
259
  public void halfClose() {
260
    if (writeClosed) {
1✔
261
      throw new IllegalStateException(
×
262
          "halfClose cannot be called after already half closed or cancelled");
263
    }
264

265
    writeClosed = true;
1✔
266
    call.halfClose();
1✔
267
  }
1✔
268

269
  /**
270
   * Status that server sent when closing channel from its side.
271
   *
272
   * @return null if stream not closed by server, otherwise Status sent by server
273
   */
274
  @VisibleForTesting
275
  Status getClosedStatus() {
276
    drainQuietly();
1✔
277
    CloseState state = closeState.get();
1✔
278
    return (state == null) ? null : state.status;
1✔
279
  }
280

281
  /**
282
   * Check for whether some action is ready.
283
   *
284
   * @return True if legal to write and writeOrRead can run without blocking
285
   */
286
  @VisibleForTesting
287
  boolean isEitherReadOrWriteReady() {
288
    return (isWriteLegal() && isWriteReady()) || isReadReady();
1✔
289
  }
290

291
  /**
292
   * Check whether there are any values waiting to be read.
293
   *
294
   * @return true if read will not block
295
   */
296
  @VisibleForTesting
297
  boolean isReadReady() {
298
    drainQuietly();
1✔
299

300
    return !buffer.isEmpty();
1✔
301
  }
302

303
  /**
304
   * Check that write hasn't been marked complete and stream is ready to receive a write (so will
305
   * not block).
306
   *
307
   * @return true if legal to write and write will not block
308
   */
309
  @VisibleForTesting
310
  boolean isWriteReady() {
311
    drainQuietly();
1✔
312

313
    return isWriteLegal() && call.isReady();
1✔
314
  }
315

316
  /**
317
   * Check whether we'll ever be able to do writes or should terminate.
318
   * @return True if writes haven't been closed and the server hasn't closed the stream
319
   */
320
  private boolean isWriteLegal() {
321
    return !writeClosed && closeState.get() == null;
1✔
322
  }
323

324
  ClientCall.Listener<RespT> getListener() {
325
    return new QueuingListener();
1✔
326
  }
327

328
  private void drainQuietly() {
329
    try {
330
      executor.drain();
1✔
331
    } catch (InterruptedException e) {
×
332
      Thread.currentThread().interrupt();
×
333
    }
1✔
334
  }
1✔
335

336
  private final class QueuingListener extends ClientCall.Listener<RespT> {
1✔
337
    @Override
338
    public void onMessage(RespT value) {
339
      Preconditions.checkState(closeState.get() == null, "ClientCall already closed");
1✔
340
      buffer.add(value);
1✔
341
    }
1✔
342

343
    @Override
344
    public void onClose(Status status, Metadata trailers) {
345
      CloseState newCloseState = new CloseState(status, trailers);
1✔
346
      boolean wasSet = closeState.compareAndSet(null, newCloseState);
1✔
347
      Preconditions.checkState(wasSet, "ClientCall already closed");
1✔
348
    }
1✔
349
  }
350

351
  private static final class CloseState {
352
    final Status status;
353
    final Metadata trailers;
354

355
    CloseState(Status status, Metadata trailers) {
1✔
356
      this.status = Preconditions.checkNotNull(status, "status");
1✔
357
      this.trailers = trailers;
1✔
358
    }
1✔
359
  }
360
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc