• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20124

22 Dec 2025 07:28PM UTC coverage: 88.72% (+0.001%) from 88.719%
#20124

push

github

ejona86
core: Improve DEADLINE_EXCEEDED message for CallCreds delays

DelayedStream is used both by DelayedClientTransport and
CallCredentialsApplyingTransport, but it wasn't clear from the error
which of the two was the cause of the delay. Now the two will have
different messages.

b/462499883

35450 of 39957 relevant lines covered (88.72%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.35
/../core/src/main/java/io/grpc/internal/DelayedClientCall.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static java.util.concurrent.TimeUnit.NANOSECONDS;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.errorprone.annotations.concurrent.GuardedBy;
26
import io.grpc.Attributes;
27
import io.grpc.ClientCall;
28
import io.grpc.Context;
29
import io.grpc.Deadline;
30
import io.grpc.Metadata;
31
import io.grpc.Status;
32
import java.util.ArrayList;
33
import java.util.List;
34
import java.util.Locale;
35
import java.util.concurrent.Executor;
36
import java.util.concurrent.ScheduledExecutorService;
37
import java.util.concurrent.ScheduledFuture;
38
import java.util.concurrent.TimeUnit;
39
import java.util.logging.Level;
40
import java.util.logging.Logger;
41
import javax.annotation.Nullable;
42

43
/**
44
 * A call that queues requests before a real call is ready to be delegated to.
45
 *
46
 * <p>{@code ClientCall} itself doesn't require thread-safety. However, the state of {@code
47
 * DelayedCall} may be internally altered by different threads, thus internal synchronization is
48
 * necessary.
49
 */
50
public class DelayedClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
51
  private static final Logger logger = Logger.getLogger(DelayedClientCall.class.getName());
1✔
52
  /**
53
   * A timer to monitor the initial deadline. The timer must be cancelled on transition to the real
54
   * call.
55
   */
56
  @Nullable
57
  private final ScheduledFuture<?> initialDeadlineMonitor;
58
  private final Executor callExecutor;
59
  private final Context context;
60
  /** {@code true} once realCall is valid and all pending calls have been drained. */
61
  private volatile boolean passThrough;
62
  /**
63
   * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate
64
   * order, but also used if an error occurs before {@code realCall} is set.
65
   */
66
  private Listener<RespT> listener;
67
  // No need to synchronize; start() synchronization provides a happens-before
68
  private Metadata startHeaders;
69
  // Must hold {@code this} lock when setting.
70
  private ClientCall<ReqT, RespT> realCall;
71
  @GuardedBy("this")
72
  private Status error;
73
  @GuardedBy("this")
1✔
74
  private List<Runnable> pendingRunnables = new ArrayList<>();
75
  @GuardedBy("this")
76
  private DelayedListener<RespT> delayedListener;
77

78
  protected DelayedClientCall(
79
      Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
1✔
80
    this.callExecutor = checkNotNull(callExecutor, "callExecutor");
1✔
81
    checkNotNull(scheduler, "scheduler");
1✔
82
    context = Context.current();
1✔
83
    initialDeadlineMonitor = scheduleDeadlineIfNeeded(scheduler, deadline);
1✔
84
  }
1✔
85

86
  // If one argument is null, consider the other the "Before"
87
  private boolean isAbeforeB(@Nullable Deadline a, @Nullable Deadline b) {
88
    if (b == null) {
1✔
89
      return true;
1✔
90
    } else if (a == null) {
×
91
      return false;
×
92
    }
93

94
    return a.isBefore(b);
×
95
  }
96

97
  @Nullable
98
  private ScheduledFuture<?> scheduleDeadlineIfNeeded(
99
      ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
100
    Deadline contextDeadline = context.getDeadline();
1✔
101
    String deadlineName;
102
    long remainingNanos;
103
    if (deadline != null && isAbeforeB(deadline, contextDeadline)) {
1✔
104
      deadlineName = "CallOptions";
1✔
105
      remainingNanos = deadline.timeRemaining(NANOSECONDS);
1✔
106
    } else if (contextDeadline != null) {
1✔
107
      deadlineName = "Context";
×
108
      remainingNanos = contextDeadline.timeRemaining(NANOSECONDS);
×
109
      if (logger.isLoggable(Level.FINE)) {
×
110
        StringBuilder builder =
×
111
            new StringBuilder(
112
                String.format(
×
113
                    Locale.US,
114
                    "Call timeout set to '%d' ns, due to context deadline.", remainingNanos));
×
115
        if (deadline == null) {
×
116
          builder.append(" Explicit call timeout was not set.");
×
117
        } else {
118
          long callTimeout = deadline.timeRemaining(TimeUnit.NANOSECONDS);
×
119
          builder.append(String.format(
×
120
              Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
×
121
        }
122
        logger.fine(builder.toString());
×
123
      }
×
124
    } else {
125
      return null;
1✔
126
    }
127

128
    /* Cancels the call if deadline exceeded prior to the real call being set. */
129
    class DeadlineExceededRunnable implements Runnable {
1✔
130
      @Override
131
      public void run() {
132
        long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
133
        long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
134
        StringBuilder buf = new StringBuilder();
1✔
135
        if (remainingNanos < 0) {
1✔
136
          buf.append("ClientCall started after ");
1✔
137
          buf.append(deadlineName);
1✔
138
          buf.append(" deadline was exceeded. Deadline has been exceeded for ");
1✔
139
        } else {
140
          buf.append("Deadline ");
1✔
141
          buf.append(deadlineName);
1✔
142
          buf.append(" was exceeded after ");
1✔
143
        }
144
        buf.append(seconds);
1✔
145
        buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
146
        buf.append("s");
1✔
147
        cancel(
1✔
148
            Status.DEADLINE_EXCEEDED.withDescription(buf.toString()),
1✔
149
            // We should not cancel the call if the realCall is set because there could be a
150
            // race between cancel() and realCall.start(). The realCall will handle deadline by
151
            // itself.
152
            /* onlyCancelPendingCall= */ true);
153
      }
1✔
154
    }
155

156
    return scheduler.schedule(new DeadlineExceededRunnable(), remainingNanos, NANOSECONDS);
1✔
157
  }
158

159
  /**
160
   * Transfers all pending and future requests and mutations to the given call.
161
   *
162
   * <p>No-op if either this method or {@link #cancel} have already been called.
163
   */
164
  // When this method returns, passThrough is guaranteed to be true
165
  public final Runnable setCall(ClientCall<ReqT, RespT> call) {
166
    Listener<RespT> savedDelayedListener;
167
    synchronized (this) {
1✔
168
      // If realCall != null, then either setCall() or cancel() has been called.
169
      if (realCall != null) {
1✔
170
        return null;
1✔
171
      }
172
      setRealCall(checkNotNull(call, "call"));
1✔
173
      // start() not yet called
174
      if (delayedListener == null) {
1✔
175
        assert pendingRunnables.isEmpty();
1✔
176
        pendingRunnables = null;
1✔
177
        passThrough = true;
1✔
178
        return null;
1✔
179
      }
180
      savedDelayedListener = this.delayedListener;
1✔
181
    }
1✔
182
    internalStart(savedDelayedListener);
1✔
183
    return new ContextRunnable(context) {
1✔
184
      @Override
185
      public void runInContext() {
186
        drainPendingCalls();
1✔
187
      }
1✔
188
    };
189
  }
190

191
  private void internalStart(Listener<RespT> listener) {
192
    Metadata savedStartHeaders = this.startHeaders;
1✔
193
    this.startHeaders = null;
1✔
194
    context.run(() -> realCall.start(listener, savedStartHeaders));
1✔
195
  }
1✔
196

197
  @Override
198
  public final void start(Listener<RespT> listener, final Metadata headers) {
199
    checkNotNull(headers, "headers");
1✔
200
    checkState(this.listener == null, "already started");
1✔
201
    Status savedError;
202
    boolean savedPassThrough;
203
    synchronized (this) {
1✔
204
      this.listener = checkNotNull(listener, "listener");
1✔
205
      // If error != null, then cancel() has been called and was unable to close the listener
206
      savedError = error;
1✔
207
      savedPassThrough = passThrough;
1✔
208
      if (!savedPassThrough) {
1✔
209
        listener = delayedListener = new DelayedListener<>(listener);
1✔
210
        startHeaders = headers;
1✔
211
      }
212
    }
1✔
213
    if (savedError != null) {
1✔
214
      callExecutor.execute(new CloseListenerRunnable(listener, savedError));
×
215
      return;
×
216
    }
217
    if (savedPassThrough) {
1✔
218
      realCall.start(listener, headers);
1✔
219
    } // else realCall.start() will be called by setCall
220
  }
1✔
221

222
  // When this method returns, passThrough is guaranteed to be true
223
  @Override
224
  public final void cancel(@Nullable final String message, @Nullable final Throwable cause) {
225
    Status status = Status.CANCELLED;
1✔
226
    if (message != null) {
1✔
227
      status = status.withDescription(message);
1✔
228
    } else {
229
      status = status.withDescription("Call cancelled without message");
1✔
230
    }
231
    if (cause != null) {
1✔
232
      status = status.withCause(cause);
1✔
233
    }
234
    cancel(status, false);
1✔
235
  }
1✔
236

237
  /**
238
   * Cancels the call unless {@code realCall} is set and {@code onlyCancelPendingCall} is true.
239
   */
240
  private void cancel(final Status status, boolean onlyCancelPendingCall) {
241
    boolean delegateToRealCall = true;
1✔
242
    Listener<RespT> listenerToClose = null;
1✔
243
    synchronized (this) {
1✔
244
      // If realCall != null, then either setCall() or cancel() has been called
245
      if (realCall == null) {
1✔
246
        @SuppressWarnings("unchecked")
247
        ClientCall<ReqT, RespT> noopCall = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
248
        setRealCall(noopCall);
1✔
249
        delegateToRealCall = false;
1✔
250
        // If listener == null, then start() will later call listener with 'error'
251
        listenerToClose = listener;
1✔
252
        error = status;
1✔
253
      } else if (onlyCancelPendingCall) {
1✔
254
        return;
×
255
      }
256
    }
1✔
257
    if (delegateToRealCall) {
1✔
258
      delayOrExecute(new Runnable() {
1✔
259
        @Override
260
        public void run() {
261
          realCall.cancel(status.getDescription(), status.getCause());
1✔
262
        }
1✔
263
      });
264
    } else {
265
      if (listenerToClose != null) {
1✔
266
        callExecutor.execute(new CloseListenerRunnable(listenerToClose, status));
1✔
267
      }
268
      internalStart(listenerToClose); // listener instance doesn't matter
1✔
269
      drainPendingCalls();
1✔
270
    }
271
    callCancelled();
1✔
272
  }
1✔
273

274
  protected void callCancelled() {
275
  }
1✔
276

277
  private void delayOrExecute(Runnable runnable) {
278
    synchronized (this) {
1✔
279
      if (!passThrough) {
1✔
280
        pendingRunnables.add(runnable);
1✔
281
        return;
1✔
282
      }
283
    }
1✔
284
    runnable.run();
1✔
285
  }
1✔
286

287
  /**
288
   * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
289
   * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
290
   * should not be held when calling this method.
291
   */
292
  private void drainPendingCalls() {
293
    assert realCall != null;
1✔
294
    assert !passThrough;
1✔
295
    List<Runnable> toRun = new ArrayList<>();
1✔
296
    DelayedListener<RespT> delayedListener ;
297
    while (true) {
298
      synchronized (this) {
1✔
299
        if (pendingRunnables.isEmpty()) {
1✔
300
          pendingRunnables = null;
1✔
301
          passThrough = true;
1✔
302
          delayedListener = this.delayedListener;
1✔
303
          break;
1✔
304
        }
305
        // Since there were pendingCalls, we need to process them. To maintain ordering we can't set
306
        // passThrough=true until we run all pendingCalls, but new Runnables may be added after we
307
        // drop the lock. So we will have to re-check pendingCalls.
308
        List<Runnable> tmp = toRun;
1✔
309
        toRun = pendingRunnables;
1✔
310
        pendingRunnables = tmp;
1✔
311
      }
1✔
312
      for (Runnable runnable : toRun) {
1✔
313
        // Must not call transport while lock is held to prevent deadlocks.
314
        // TODO(ejona): exception handling
315
        runnable.run();
1✔
316
      }
1✔
317
      toRun.clear();
1✔
318
    }
319
    if (delayedListener != null) {
1✔
320
      final DelayedListener<RespT> listener = delayedListener;
1✔
321
      class DrainListenerRunnable extends ContextRunnable {
322
        DrainListenerRunnable() {
1✔
323
          super(context);
1✔
324
        }
1✔
325

326
        @Override
327
        public void runInContext() {
328
          listener.drainPendingCallbacks();
1✔
329
        }
1✔
330
      }
331

332
      callExecutor.execute(new DrainListenerRunnable());
1✔
333
    }
334
  }
1✔
335

336
  @GuardedBy("this")
337
  private void setRealCall(ClientCall<ReqT, RespT> realCall) {
338
    checkState(this.realCall == null, "realCall already set to %s", this.realCall);
1✔
339
    if (initialDeadlineMonitor != null) {
1✔
340
      initialDeadlineMonitor.cancel(false);
1✔
341
    }
342
    this.realCall = realCall;
1✔
343
  }
1✔
344

345
  @VisibleForTesting
346
  final ClientCall<ReqT, RespT> getRealCall() {
347
    return realCall;
×
348
  }
349

350
  @Override
351
  public final void sendMessage(final ReqT message) {
352
    if (passThrough) {
1✔
353
      realCall.sendMessage(message);
1✔
354
    } else {
355
      delayOrExecute(new Runnable() {
1✔
356
        @Override
357
        public void run() {
358
          realCall.sendMessage(message);
1✔
359
        }
1✔
360
      });
361
    }
362
  }
1✔
363

364
  @Override
365
  public final void setMessageCompression(final boolean enable) {
366
    if (passThrough) {
1✔
367
      realCall.setMessageCompression(enable);
1✔
368
    } else {
369
      delayOrExecute(new Runnable() {
1✔
370
        @Override
371
        public void run() {
372
          realCall.setMessageCompression(enable);
1✔
373
        }
1✔
374
      });
375
    }
376
  }
1✔
377

378
  @Override
379
  public final void request(final int numMessages) {
380
    if (passThrough) {
1✔
381
      realCall.request(numMessages);
1✔
382
    } else {
383
      delayOrExecute(new Runnable() {
1✔
384
        @Override
385
        public void run() {
386
          realCall.request(numMessages);
1✔
387
        }
1✔
388
      });
389
    }
390
  }
1✔
391

392
  @Override
393
  public final void halfClose() {
394
    delayOrExecute(new Runnable() {
1✔
395
      @Override
396
      public void run() {
397
        realCall.halfClose();
1✔
398
      }
1✔
399
    });
400
  }
1✔
401

402
  @Override
403
  public final boolean isReady() {
404
    if (passThrough) {
1✔
405
      return realCall.isReady();
1✔
406
    } else {
407
      return false;
×
408
    }
409
  }
410

411
  @Override
412
  public final Attributes getAttributes() {
413
    ClientCall<ReqT, RespT> savedRealCall;
414
    synchronized (this) {
1✔
415
      savedRealCall = realCall;
1✔
416
    }
1✔
417
    if (savedRealCall != null) {
1✔
418
      return savedRealCall.getAttributes();
1✔
419
    } else {
420
      return Attributes.EMPTY;
×
421
    }
422
  }
423

424
  @Override
425
  public String toString() {
426
    return MoreObjects.toStringHelper(this)
×
427
        .add("realCall", realCall)
×
428
        .toString();
×
429
  }
430

431
  private final class CloseListenerRunnable extends ContextRunnable {
432
    final Listener<RespT> listener;
433
    final Status status;
434

435
    CloseListenerRunnable(Listener<RespT> listener, Status status) {
1✔
436
      super(context);
1✔
437
      this.listener = listener;
1✔
438
      this.status = status;
1✔
439
    }
1✔
440

441
    @Override
442
    public void runInContext() {
443
      listener.onClose(status, new Metadata());
1✔
444
    }
1✔
445
  }
446

447
  private static final class DelayedListener<RespT> extends Listener<RespT> {
1✔
448
    private final Listener<RespT> realListener;
449
    private volatile boolean passThrough;
450
    @GuardedBy("this")
1✔
451
    private List<Runnable> pendingCallbacks = new ArrayList<>();
452

453
    public DelayedListener(Listener<RespT> listener) {
1✔
454
      this.realListener = listener;
1✔
455
    }
1✔
456

457
    private void delayOrExecute(Runnable runnable) {
458
      synchronized (this) {
1✔
459
        if (!passThrough) {
1✔
460
          pendingCallbacks.add(runnable);
1✔
461
          return;
1✔
462
        }
463
      }
1✔
464
      runnable.run();
1✔
465
    }
1✔
466

467
    @Override
468
    public void onHeaders(final Metadata headers) {
469
      if (passThrough) {
1✔
470
        realListener.onHeaders(headers);
1✔
471
      } else {
472
        delayOrExecute(new Runnable() {
×
473
          @Override
474
          public void run() {
475
            realListener.onHeaders(headers);
×
476
          }
×
477
        });
478
      }
479
    }
1✔
480

481
    @Override
482
    public void onMessage(final RespT message) {
483
      if (passThrough) {
1✔
484
        realListener.onMessage(message);
1✔
485
      } else {
486
        delayOrExecute(new Runnable() {
×
487
          @Override
488
          public void run() {
489
            realListener.onMessage(message);
×
490
          }
×
491
        });
492
      }
493
    }
1✔
494

495
    @Override
496
    public void onClose(final Status status, final Metadata trailers) {
497
      delayOrExecute(new Runnable() {
1✔
498
        @Override
499
        public void run() {
500
          realListener.onClose(status, trailers);
1✔
501
        }
1✔
502
      });
503
    }
1✔
504

505
    @Override
506
    public void onReady() {
507
      if (passThrough) {
1✔
508
        realListener.onReady();
1✔
509
      } else {
510
        delayOrExecute(new Runnable() {
1✔
511
          @Override
512
          public void run() {
513
            realListener.onReady();
1✔
514
          }
1✔
515
        });
516
      }
517
    }
1✔
518

519
    void drainPendingCallbacks() {
520
      assert !passThrough;
1✔
521
      List<Runnable> toRun = new ArrayList<>();
1✔
522
      while (true) {
523
        synchronized (this) {
1✔
524
          if (pendingCallbacks.isEmpty()) {
1✔
525
            pendingCallbacks = null;
1✔
526
            passThrough = true;
1✔
527
            break;
1✔
528
          }
529
          // Since there were pendingCallbacks, we need to process them. To maintain ordering we
530
          // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
531
          // added after we drop the lock. So we will have to re-check pendingCallbacks.
532
          List<Runnable> tmp = toRun;
1✔
533
          toRun = pendingCallbacks;
1✔
534
          pendingCallbacks = tmp;
1✔
535
        }
1✔
536
        for (Runnable runnable : toRun) {
1✔
537
          // Avoid calling listener while lock is held to prevent deadlocks.
538
          // TODO(ejona): exception handling
539
          runnable.run();
1✔
540
        }
1✔
541
        toRun.clear();
1✔
542
      }
543
    }
1✔
544
  }
545

546
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
547
    @Override
548
    public void start(Listener<Object> responseListener, Metadata headers) {}
1✔
549

550
    @Override
551
    public void request(int numMessages) {}
1✔
552

553
    @Override
554
    public void cancel(String message, Throwable cause) {}
1✔
555

556
    @Override
557
    public void halfClose() {}
1✔
558

559
    @Override
560
    public void sendMessage(Object message) {}
1✔
561

562
    // Always returns {@code false}, since this is only used when the startup of the call fails.
563
    @Override
564
    public boolean isReady() {
565
      return false;
×
566
    }
567
  };
568
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc