• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20124

22 Dec 2025 07:28PM UTC coverage: 88.72% (+0.001%) from 88.719%
#20124

push

github

ejona86
core: Improve DEADLINE_EXCEEDED message for CallCreds delays

DelayedStream is used both by DelayedClientTransport and
CallCredentialsApplyingTransport, but it wasn't clear from the error
which of the two was the cause of the delay. Now the two will have
different messages.

b/462499883

35450 of 39957 relevant lines covered (88.72%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.17
/../core/src/main/java/io/grpc/internal/DelayedStream.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.errorprone.annotations.CheckReturnValue;
24
import com.google.errorprone.annotations.concurrent.GuardedBy;
25
import io.grpc.Attributes;
26
import io.grpc.Compressor;
27
import io.grpc.Deadline;
28
import io.grpc.DecompressorRegistry;
29
import io.grpc.Metadata;
30
import io.grpc.Status;
31
import io.grpc.internal.ClientStreamListener.RpcProgress;
32
import java.io.InputStream;
33
import java.util.ArrayList;
34
import java.util.List;
35

36
/**
37
 * A stream that queues requests before the transport is available, and delegates to a real stream
38
 * implementation when the transport is available.
39
 *
40
 * <p>{@code ClientStream} itself doesn't require thread-safety. However, the state of {@code
41
 * DelayedStream} may be internally altered by different threads, thus internal synchronization is
42
 * necessary.
43
 */
44
class DelayedStream implements ClientStream {
1✔
45
  private final String bufferContext;
46
  /** {@code true} once realStream is valid and all pending calls have been drained. */
47
  private volatile boolean passThrough;
48
  /**
49
   * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate
50
   * order, but also used if an error occurs before {@code realStream} is set.
51
   */
52
  private ClientStreamListener listener;
53
  /** Must hold {@code this} lock when setting. */
54
  private ClientStream realStream;
55
  @GuardedBy("this")
56
  private Status error;
57
  @GuardedBy("this")
1✔
58
  private List<Runnable> pendingCalls = new ArrayList<>();
59
  @GuardedBy("this")
60
  private DelayedStreamListener delayedListener;
61
  @GuardedBy("this")
62
  private long startTimeNanos;
63
  @GuardedBy("this")
64
  private long streamSetTimeNanos;
65
  // No need to synchronize; start() synchronization provides a happens-before
66
  private List<Runnable> preStartPendingCalls = new ArrayList<>();
1✔
67

68
  /**
69
   * Create a delayed stream with debug context {@code bufferContext}. The context is what this
70
   * stream is delayed by (e.g., "connecting", "call_credentials").
71
   */
72
  public DelayedStream(String bufferContext) {
1✔
73
    this.bufferContext = checkNotNull(bufferContext, "bufferContext");
1✔
74
  }
1✔
75

76
  @Override
77
  public void setMaxInboundMessageSize(final int maxSize) {
78
    checkState(listener == null, "May only be called before start");
1✔
79
    preStartPendingCalls.add(new Runnable() {
1✔
80
      @Override
81
      public void run() {
82
        realStream.setMaxInboundMessageSize(maxSize);
1✔
83
      }
1✔
84
    });
85
  }
1✔
86

87
  @Override
88
  public void setMaxOutboundMessageSize(final int maxSize) {
89
    checkState(listener == null, "May only be called before start");
1✔
90
    preStartPendingCalls.add(new Runnable() {
1✔
91
      @Override
92
      public void run() {
93
        realStream.setMaxOutboundMessageSize(maxSize);
1✔
94
      }
1✔
95
    });
96
  }
1✔
97

98
  @Override
99
  public void setDeadline(final Deadline deadline) {
100
    checkState(listener == null, "May only be called before start");
1✔
101
    preStartPendingCalls.add(new Runnable() {
1✔
102
      @Override
103
      public void run() {
104
        realStream.setDeadline(deadline);
1✔
105
      }
1✔
106
    });
107
  }
1✔
108

109
  @Override
110
  public void appendTimeoutInsight(InsightBuilder insight) {
111
    synchronized (this) {
1✔
112
      if (listener == null) {
1✔
113
        return;
1✔
114
      }
115
      if (realStream != null) {
1✔
116
        insight.appendKeyValue(
1✔
117
            bufferContext + "_delay", "" + (streamSetTimeNanos - startTimeNanos) + "ns");
118
        realStream.appendTimeoutInsight(insight);
1✔
119
      } else {
120
        insight.appendKeyValue(
1✔
121
            bufferContext + "_delay", "" + (System.nanoTime() - startTimeNanos) + "ns");
1✔
122
        insight.append("was_still_waiting");
1✔
123
      }
124
    }
1✔
125
  }
1✔
126

127
  /**
128
   * Transfers all pending and future requests and mutations to the given stream. Method will return
129
   * quickly, but if the returned Runnable is non-null it must be called to complete the process.
130
   * The Runnable may take a while to execute.
131
   *
132
   * <p>No-op if either this method or {@link #cancel} have already been called.
133
   */
134
  // When this method returns, start() has been called on realStream or passThrough is guaranteed to
135
  // be true
136
  @CheckReturnValue
137
  final Runnable setStream(ClientStream stream) {
138
    ClientStreamListener savedListener;
139
    synchronized (this) {
1✔
140
      // If realStream != null, then either setStream() or cancel() has been called.
141
      if (realStream != null) {
1✔
142
        return null;
1✔
143
      }
144
      setRealStream(checkNotNull(stream, "stream"));
1✔
145
      savedListener = listener;
1✔
146
      if (savedListener == null) {
1✔
147
        assert pendingCalls.isEmpty();
1✔
148
        pendingCalls = null;
1✔
149
        passThrough = true;
1✔
150
      }
151
    }
1✔
152
    if (savedListener == null) {
1✔
153
      return null;
1✔
154
    } else {
155
      internalStart(savedListener);
1✔
156
      return new Runnable() {
1✔
157
        @Override
158
        public void run() {
159
          drainPendingCalls();
1✔
160
        }
1✔
161
      };
162
    }
163
  }
164

165
  /**
166
   * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
167
   * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
168
   * should not be held when calling this method.
169
   */
170
  private void drainPendingCalls() {
171
    assert realStream != null;
1✔
172
    assert !passThrough;
1✔
173
    List<Runnable> toRun = new ArrayList<>();
1✔
174
    DelayedStreamListener delayedListener = null;
1✔
175
    while (true) {
176
      synchronized (this) {
1✔
177
        if (pendingCalls.isEmpty()) {
1✔
178
          pendingCalls = null;
1✔
179
          passThrough = true;
1✔
180
          delayedListener = this.delayedListener;
1✔
181
          break;
1✔
182
        }
183
        // Since there were pendingCalls, we need to process them. To maintain ordering we can't set
184
        // passThrough=true until we run all pendingCalls, but new Runnables may be added after we
185
        // drop the lock. So we will have to re-check pendingCalls.
186
        List<Runnable> tmp = toRun;
1✔
187
        toRun = pendingCalls;
1✔
188
        pendingCalls = tmp;
1✔
189
      }
1✔
190
      for (Runnable runnable : toRun) {
1✔
191
        // Must not call transport while lock is held to prevent deadlocks.
192
        // TODO(ejona): exception handling
193
        runnable.run();
1✔
194
      }
1✔
195
      toRun.clear();
1✔
196
    }
197
    if (delayedListener != null) {
1✔
198
      delayedListener.drainPendingCallbacks();
1✔
199
    }
200
  }
1✔
201

202
  /**
203
   * Enqueue the runnable or execute it now. Call sites that may be called many times may want avoid
204
   * this method if {@code passThrough == true}.
205
   *
206
   * <p>Note that this method is no more thread-safe than {@code runnable}. It is thread-safe if and
207
   * only if {@code runnable} is thread-safe.
208
   */
209
  private void delayOrExecute(Runnable runnable) {
210
    checkState(listener != null, "May only be called after start");
1✔
211
    synchronized (this) {
1✔
212
      if (!passThrough) {
1✔
213
        pendingCalls.add(runnable);
1✔
214
        return;
1✔
215
      }
216
    }
1✔
217
    runnable.run();
1✔
218
  }
1✔
219

220
  @Override
221
  public void setAuthority(final String authority) {
222
    checkNotNull(authority, "authority");
1✔
223
    preStartPendingCalls.add(new Runnable() {
1✔
224
      @Override
225
      public void run() {
226
        realStream.setAuthority(authority);
1✔
227
      }
1✔
228
    });
229
  }
1✔
230

231
  @Override
232
  public void start(ClientStreamListener listener) {
233
    checkNotNull(listener, "listener");
1✔
234
    checkState(this.listener == null, "already started");
1✔
235

236
    Status savedError;
237
    boolean savedPassThrough;
238
    synchronized (this) {
1✔
239
      // If error != null, then cancel() has been called and was unable to close the listener
240
      savedError = error;
1✔
241
      savedPassThrough = passThrough;
1✔
242
      if (!savedPassThrough) {
1✔
243
        listener = delayedListener = new DelayedStreamListener(listener);
1✔
244
      }
245
      this.listener = listener;
1✔
246
      startTimeNanos = System.nanoTime();
1✔
247
    }
1✔
248
    if (savedError != null) {
1✔
249
      listener.closed(savedError, RpcProgress.PROCESSED, new Metadata());
×
250
      return;
×
251
    }
252

253
    if (savedPassThrough) {
1✔
254
      internalStart(listener);
1✔
255
    } // else internalStart() will be called by setStream
256
  }
1✔
257

258
  /**
259
   * Starts stream without synchronization. {@code listener} should be same instance as {@link
260
   * #listener}.
261
   */
262
  private void internalStart(ClientStreamListener listener) {
263
    for (Runnable runnable : preStartPendingCalls) {
1✔
264
      runnable.run();
1✔
265
    }
1✔
266
    preStartPendingCalls = null;
1✔
267
    realStream.start(listener);
1✔
268
  }
1✔
269

270
  @Override
271
  public Attributes getAttributes() {
272
    ClientStream savedRealStream;
273
    synchronized (this) {
1✔
274
      savedRealStream = realStream;
1✔
275
    }
1✔
276
    if (savedRealStream != null) {
1✔
277
      return savedRealStream.getAttributes();
1✔
278
    } else {
279
      return Attributes.EMPTY;
1✔
280
    }
281
  }
282

283
  @Override
284
  public void writeMessage(final InputStream message) {
285
    checkState(listener != null, "May only be called after start");
1✔
286
    checkNotNull(message, "message");
1✔
287
    if (passThrough) {
1✔
288
      realStream.writeMessage(message);
1✔
289
    } else {
290
      delayOrExecute(new Runnable() {
1✔
291
        @Override
292
        public void run() {
293
          realStream.writeMessage(message);
1✔
294
        }
1✔
295
      });
296
    }
297
  }
1✔
298

299
  @Override
300
  public void flush() {
301
    checkState(listener != null, "May only be called after start");
1✔
302
    if (passThrough) {
1✔
303
      realStream.flush();
1✔
304
    } else {
305
      delayOrExecute(new Runnable() {
1✔
306
        @Override
307
        public void run() {
308
          realStream.flush();
1✔
309
        }
1✔
310
      });
311
    }
312
  }
1✔
313

314
  // When this method returns, passThrough is guaranteed to be true
315
  @Override
316
  public void cancel(final Status reason) {
317
    checkState(listener != null, "May only be called after start");
1✔
318
    checkNotNull(reason, "reason");
1✔
319
    boolean delegateToRealStream = true;
1✔
320
    synchronized (this) {
1✔
321
      // If realStream != null, then either setStream() or cancel() has been called
322
      if (realStream == null) {
1✔
323
        setRealStream(NoopClientStream.INSTANCE);
1✔
324
        delegateToRealStream = false;
1✔
325
        error = reason;
1✔
326
      }
327
    }
1✔
328
    if (delegateToRealStream) {
1✔
329
      delayOrExecute(new Runnable() {
1✔
330
        @Override
331
        public void run() {
332
          realStream.cancel(reason);
1✔
333
        }
1✔
334
      });
335
    } else {
336
      drainPendingCalls();
1✔
337
      onEarlyCancellation(reason);
1✔
338
      // Note that listener is a DelayedStreamListener
339
      listener.closed(reason, RpcProgress.PROCESSED, new Metadata());
1✔
340
    }
341
  }
1✔
342

343
  protected void onEarlyCancellation(Status reason) {
344
  }
1✔
345

346
  @GuardedBy("this")
347
  private void setRealStream(ClientStream realStream) {
348
    checkState(this.realStream == null, "realStream already set to %s", this.realStream);
1✔
349
    this.realStream = realStream;
1✔
350
    streamSetTimeNanos = System.nanoTime();
1✔
351
  }
1✔
352

353
  @Override
354
  public void halfClose() {
355
    checkState(listener != null, "May only be called after start");
1✔
356
    delayOrExecute(new Runnable() {
1✔
357
      @Override
358
      public void run() {
359
        realStream.halfClose();
1✔
360
      }
1✔
361
    });
362
  }
1✔
363

364
  @Override
365
  public void request(final int numMessages) {
366
    checkState(listener != null, "May only be called after start");
1✔
367
    if (passThrough) {
1✔
368
      realStream.request(numMessages);
1✔
369
    } else {
370
      delayOrExecute(new Runnable() {
1✔
371
        @Override
372
        public void run() {
373
          realStream.request(numMessages);
1✔
374
        }
1✔
375
      });
376
    }
377
  }
1✔
378

379
  @Override
380
  public void optimizeForDirectExecutor() {
381
    checkState(listener == null, "May only be called before start");
1✔
382
    preStartPendingCalls.add(new Runnable() {
1✔
383
      @Override
384
      public void run() {
385
        realStream.optimizeForDirectExecutor();
1✔
386
      }
1✔
387
    });
388
  }
1✔
389

390
  @Override
391
  public void setCompressor(final Compressor compressor) {
392
    checkState(listener == null, "May only be called before start");
1✔
393
    checkNotNull(compressor, "compressor");
1✔
394
    preStartPendingCalls.add(new Runnable() {
1✔
395
      @Override
396
      public void run() {
397
        realStream.setCompressor(compressor);
1✔
398
      }
1✔
399
    });
400
  }
1✔
401

402
  @Override
403
  public void setFullStreamDecompression(final boolean fullStreamDecompression) {
404
    checkState(listener == null, "May only be called before start");
1✔
405
    preStartPendingCalls.add(
1✔
406
        new Runnable() {
1✔
407
          @Override
408
          public void run() {
409
            realStream.setFullStreamDecompression(fullStreamDecompression);
1✔
410
          }
1✔
411
        });
412
  }
1✔
413

414
  @Override
415
  public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
416
    checkState(listener == null, "May only be called before start");
1✔
417
    checkNotNull(decompressorRegistry, "decompressorRegistry");
1✔
418
    preStartPendingCalls.add(new Runnable() {
1✔
419
      @Override
420
      public void run() {
421
        realStream.setDecompressorRegistry(decompressorRegistry);
1✔
422
      }
1✔
423
    });
424
  }
1✔
425

426
  @Override
427
  public boolean isReady() {
428
    if (passThrough) {
1✔
429
      return realStream.isReady();
1✔
430
    } else {
431
      return false;
1✔
432
    }
433
  }
434

435
  @Override
436
  public void setMessageCompression(final boolean enable) {
437
    checkState(listener != null, "May only be called after start");
1✔
438
    if (passThrough) {
1✔
439
      realStream.setMessageCompression(enable);
1✔
440
    } else {
441
      delayOrExecute(new Runnable() {
1✔
442
        @Override
443
        public void run() {
444
          realStream.setMessageCompression(enable);
1✔
445
        }
1✔
446
      });
447
    }
448
  }
1✔
449

450
  @VisibleForTesting
451
  ClientStream getRealStream() {
452
    return realStream;
1✔
453
  }
454

455
  private static class DelayedStreamListener implements ClientStreamListener {
1✔
456
    private final ClientStreamListener realListener;
457
    private volatile boolean passThrough;
458
    @GuardedBy("this")
1✔
459
    private List<Runnable> pendingCallbacks = new ArrayList<>();
460

461
    public DelayedStreamListener(ClientStreamListener listener) {
1✔
462
      this.realListener = listener;
1✔
463
    }
1✔
464

465
    private void delayOrExecute(Runnable runnable) {
466
      synchronized (this) {
1✔
467
        if (!passThrough) {
1✔
468
          pendingCallbacks.add(runnable);
1✔
469
          return;
1✔
470
        }
471
      }
1✔
472
      runnable.run();
1✔
473
    }
1✔
474

475
    @Override
476
    public void messagesAvailable(final MessageProducer producer) {
477
      if (passThrough) {
1✔
478
        realListener.messagesAvailable(producer);
1✔
479
      } else {
480
        delayOrExecute(new Runnable() {
1✔
481
          @Override
482
          public void run() {
483
            realListener.messagesAvailable(producer);
1✔
484
          }
1✔
485
        });
486
      }
487
    }
1✔
488

489
    @Override
490
    public void onReady() {
491
      if (passThrough) {
1✔
492
        realListener.onReady();
1✔
493
      } else {
494
        delayOrExecute(new Runnable() {
1✔
495
          @Override
496
          public void run() {
497
            realListener.onReady();
1✔
498
          }
1✔
499
        });
500
      }
501
    }
1✔
502

503
    @Override
504
    public void headersRead(final Metadata headers) {
505
      delayOrExecute(new Runnable() {
1✔
506
        @Override
507
        public void run() {
508
          realListener.headersRead(headers);
1✔
509
        }
1✔
510
      });
511
    }
1✔
512

513
    @Override
514
    public void closed(
515
        final Status status, final RpcProgress rpcProgress,
516
        final Metadata trailers) {
517
      delayOrExecute(new Runnable() {
1✔
518
        @Override
519
        public void run() {
520
          realListener.closed(status, rpcProgress, trailers);
1✔
521
        }
1✔
522
      });
523
    }
1✔
524

525
    public void drainPendingCallbacks() {
526
      assert !passThrough;
1✔
527
      List<Runnable> toRun = new ArrayList<>();
1✔
528
      while (true) {
529
        synchronized (this) {
1✔
530
          if (pendingCallbacks.isEmpty()) {
1✔
531
            pendingCallbacks = null;
1✔
532
            passThrough = true;
1✔
533
            break;
1✔
534
          }
535
          // Since there were pendingCallbacks, we need to process them. To maintain ordering we
536
          // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
537
          // added after we drop the lock. So we will have to re-check pendingCallbacks.
538
          List<Runnable> tmp = toRun;
1✔
539
          toRun = pendingCallbacks;
1✔
540
          pendingCallbacks = tmp;
1✔
541
        }
1✔
542
        for (Runnable runnable : toRun) {
1✔
543
          // Avoid calling listener while lock is held to prevent deadlocks.
544
          // TODO(ejona): exception handling
545
          runnable.run();
1✔
546
        }
1✔
547
        toRun.clear();
1✔
548
      }
549
    }
1✔
550
  }
551
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc