• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20132

29 Dec 2025 04:07PM UTC coverage: 88.689% (+0.02%) from 88.666%
#20132

push

github

ejona86
bazel: Fix Bazel 8 WORKSPACE support

There's some rearranging happening here, like moving jar_jar later. But
the main thing is downgrading rules_java to 8.15.2. rules_java 8.16 and
later somehow break protobuf:

```
	File ".../external/com_google_protobuf/bazel/private/proto_library_rule.bzl", line 43, column 17, in _get_import_prefix
		if not paths.is_normalized(import_prefix):
Error: 'struct' value has no field or method 'is_normalized'
Available attributes: basename, dirname, is_absolute, join, normalize, relativize, replace_extension, split_extension
```

https://github.com/protocolbuffers/protobuf/issues/17687 claims that
this is due to not using bazel_skylib 1.7.0, but protobuf_deps is
defining bazel_skylib to be 1.7.0 and nothing earlier seems to be
defining bazel_skylib. So we'll leave this as a bit of a mystery for
later. rules_java 8.15.2 is still newer than protobuf_deps's 8.6.1.

35471 of 39995 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.14
/../core/src/main/java/io/grpc/internal/DelayedClientCall.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static java.util.concurrent.TimeUnit.NANOSECONDS;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.errorprone.annotations.concurrent.GuardedBy;
26
import io.grpc.Attributes;
27
import io.grpc.ClientCall;
28
import io.grpc.Context;
29
import io.grpc.Deadline;
30
import io.grpc.Metadata;
31
import io.grpc.Status;
32
import java.util.ArrayList;
33
import java.util.List;
34
import java.util.Locale;
35
import java.util.concurrent.Executor;
36
import java.util.concurrent.ScheduledExecutorService;
37
import java.util.concurrent.ScheduledFuture;
38
import java.util.concurrent.TimeUnit;
39
import java.util.logging.Level;
40
import java.util.logging.Logger;
41
import javax.annotation.Nullable;
42

43
/**
44
 * A call that queues requests before a real call is ready to be delegated to.
45
 *
46
 * <p>{@code ClientCall} itself doesn't require thread-safety. However, the state of {@code
47
 * DelayedCall} may be internally altered by different threads, thus internal synchronization is
48
 * necessary.
49
 */
50
public class DelayedClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
51
  private static final Logger logger = Logger.getLogger(DelayedClientCall.class.getName());
1✔
52
  /**
53
   * A timer to monitor the initial deadline. The timer must be cancelled on transition to the real
54
   * call.
55
   */
56
  @Nullable
57
  private final ScheduledFuture<?> initialDeadlineMonitor;
58
  private final Executor callExecutor;
59
  private final Context context;
60
  /** {@code true} once realCall is valid and all pending calls have been drained. */
61
  private volatile boolean passThrough;
62
  /**
63
   * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate
64
   * order, but also used if an error occurs before {@code realCall} is set.
65
   */
66
  private Listener<RespT> listener;
67
  // No need to synchronize; start() synchronization provides a happens-before
68
  private Metadata startHeaders;
69
  // Must hold {@code this} lock when setting.
70
  private ClientCall<ReqT, RespT> realCall;
71
  @GuardedBy("this")
72
  private Status error;
73
  @GuardedBy("this")
1✔
74
  private List<Runnable> pendingRunnables = new ArrayList<>();
75
  @GuardedBy("this")
76
  private DelayedListener<RespT> delayedListener;
77

78
  protected DelayedClientCall(
79
      Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
1✔
80
    this.callExecutor = checkNotNull(callExecutor, "callExecutor");
1✔
81
    checkNotNull(scheduler, "scheduler");
1✔
82
    context = Context.current();
1✔
83
    initialDeadlineMonitor = scheduleDeadlineIfNeeded(scheduler, deadline);
1✔
84
  }
1✔
85

86
  // If one argument is null, consider the other the "Before"
87
  private boolean isAbeforeB(@Nullable Deadline a, @Nullable Deadline b) {
88
    if (b == null) {
1✔
89
      return true;
1✔
90
    } else if (a == null) {
×
91
      return false;
×
92
    }
93

94
    return a.isBefore(b);
×
95
  }
96

97
  @Nullable
98
  private ScheduledFuture<?> scheduleDeadlineIfNeeded(
99
      ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
100
    Deadline contextDeadline = context.getDeadline();
1✔
101
    String deadlineName;
102
    long remainingNanos;
103
    if (deadline != null && isAbeforeB(deadline, contextDeadline)) {
1✔
104
      deadlineName = "CallOptions";
1✔
105
      remainingNanos = deadline.timeRemaining(NANOSECONDS);
1✔
106
    } else if (contextDeadline != null) {
1✔
107
      deadlineName = "Context";
×
108
      remainingNanos = contextDeadline.timeRemaining(NANOSECONDS);
×
109
      if (logger.isLoggable(Level.FINE)) {
×
110
        StringBuilder builder =
×
111
            new StringBuilder(
112
                String.format(
×
113
                    Locale.US,
114
                    "Call timeout set to '%d' ns, due to context deadline.", remainingNanos));
×
115
        if (deadline == null) {
×
116
          builder.append(" Explicit call timeout was not set.");
×
117
        } else {
118
          long callTimeout = deadline.timeRemaining(TimeUnit.NANOSECONDS);
×
119
          builder.append(String.format(
×
120
              Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
×
121
        }
122
        logger.fine(builder.toString());
×
123
      }
×
124
    } else {
125
      return null;
1✔
126
    }
127

128
    /* Cancels the call if deadline exceeded prior to the real call being set. */
129
    class DeadlineExceededRunnable implements Runnable {
1✔
130
      @Override
131
      public void run() {
132
        long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
133
        long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
134
        StringBuilder buf = new StringBuilder();
1✔
135
        if (remainingNanos < 0) {
1✔
136
          buf.append("ClientCall started after ");
1✔
137
          buf.append(deadlineName);
1✔
138
          buf.append(" deadline was exceeded. Deadline has been exceeded for ");
1✔
139
        } else {
140
          buf.append("Deadline ");
1✔
141
          buf.append(deadlineName);
1✔
142
          buf.append(" was exceeded after ");
1✔
143
        }
144
        buf.append(seconds);
1✔
145
        buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
146
        buf.append("s");
1✔
147
        cancel(
1✔
148
            Status.DEADLINE_EXCEEDED.withDescription(buf.toString()),
1✔
149
            // We should not cancel the call if the realCall is set because there could be a
150
            // race between cancel() and realCall.start(). The realCall will handle deadline by
151
            // itself.
152
            /* onlyCancelPendingCall= */ true);
153
      }
1✔
154
    }
155

156
    return scheduler.schedule(new DeadlineExceededRunnable(), remainingNanos, NANOSECONDS);
1✔
157
  }
158

159
  /**
160
   * Transfers all pending and future requests and mutations to the given call.
161
   *
162
   * <p>No-op if either this method or {@link #cancel} have already been called.
163
   */
164
  // When this method returns, passThrough is guaranteed to be true
165
  public final Runnable setCall(ClientCall<ReqT, RespT> call) {
166
    Listener<RespT> savedDelayedListener;
167
    synchronized (this) {
1✔
168
      // If realCall != null, then either setCall() or cancel() has been called.
169
      if (realCall != null) {
1✔
170
        return null;
1✔
171
      }
172
      setRealCall(checkNotNull(call, "call"));
1✔
173
      // start() not yet called
174
      if (delayedListener == null) {
1✔
175
        assert pendingRunnables.isEmpty();
1✔
176
        pendingRunnables = null;
1✔
177
        passThrough = true;
1✔
178
        return null;
1✔
179
      }
180
      savedDelayedListener = this.delayedListener;
1✔
181
    }
1✔
182
    internalStart(savedDelayedListener);
1✔
183
    return new ContextRunnable(context) {
1✔
184
      @Override
185
      public void runInContext() {
186
        drainPendingCalls();
1✔
187
      }
1✔
188
    };
189
  }
190

191
  private void internalStart(Listener<RespT> listener) {
192
    Metadata savedStartHeaders = this.startHeaders;
1✔
193
    this.startHeaders = null;
1✔
194
    context.run(() -> realCall.start(listener, savedStartHeaders));
1✔
195
  }
1✔
196

197
  @Override
198
  public final void start(Listener<RespT> listener, final Metadata headers) {
199
    checkNotNull(headers, "headers");
1✔
200
    checkState(this.listener == null, "already started");
1✔
201
    Status savedError;
202
    boolean savedPassThrough;
203
    synchronized (this) {
1✔
204
      this.listener = checkNotNull(listener, "listener");
1✔
205
      // If error != null, then cancel() has been called and was unable to close the listener
206
      savedError = error;
1✔
207
      savedPassThrough = passThrough;
1✔
208
      if (!savedPassThrough) {
1✔
209
        listener = delayedListener = new DelayedListener<>(listener);
1✔
210
        startHeaders = headers;
1✔
211
      }
212
    }
1✔
213
    if (savedError != null) {
1✔
214
      callExecutor.execute(new CloseListenerRunnable(listener, savedError));
1✔
215
      return;
1✔
216
    }
217
    if (savedPassThrough) {
1✔
218
      realCall.start(listener, headers);
1✔
219
    } // else realCall.start() will be called by setCall
220
  }
1✔
221

222
  // When this method returns, passThrough is guaranteed to be true
223
  @Override
224
  public final void cancel(@Nullable final String message, @Nullable final Throwable cause) {
225
    Status status = Status.CANCELLED;
1✔
226
    if (message != null) {
1✔
227
      status = status.withDescription(message);
1✔
228
    } else {
229
      status = status.withDescription("Call cancelled without message");
1✔
230
    }
231
    if (cause != null) {
1✔
232
      status = status.withCause(cause);
1✔
233
    }
234
    cancel(status, false);
1✔
235
  }
1✔
236

237
  /**
238
   * Cancels the call unless {@code realCall} is set and {@code onlyCancelPendingCall} is true.
239
   */
240
  private void cancel(final Status status, boolean onlyCancelPendingCall) {
241
    boolean delegateToRealCall = true;
1✔
242
    Listener<RespT> listenerToClose = null;
1✔
243
    synchronized (this) {
1✔
244
      // If realCall != null, then either setCall() or cancel() has been called
245
      if (realCall == null) {
1✔
246
        @SuppressWarnings("unchecked")
247
        ClientCall<ReqT, RespT> noopCall = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
248
        setRealCall(noopCall);
1✔
249
        delegateToRealCall = false;
1✔
250
        // If listener == null, then start() will later call listener with 'error'
251
        listenerToClose = listener;
1✔
252
        error = status;
1✔
253
      } else if (onlyCancelPendingCall) {
1✔
254
        return;
×
255
      }
256
    }
1✔
257
    if (delegateToRealCall) {
1✔
258
      delayOrExecute(new Runnable() {
1✔
259
        @Override
260
        public void run() {
261
          realCall.cancel(status.getDescription(), status.getCause());
1✔
262
        }
1✔
263
      });
264
    } else {
265
      if (listenerToClose != null) {
1✔
266
        callExecutor.execute(new CloseListenerRunnable(listenerToClose, status));
1✔
267
      }
268
      internalStart(listenerToClose); // listener instance doesn't matter
1✔
269
      drainPendingCalls();
1✔
270
    }
271
    callCancelled();
1✔
272
  }
1✔
273

274
  protected void callCancelled() {
275
  }
1✔
276

277
  private void delayOrExecute(Runnable runnable) {
278
    synchronized (this) {
1✔
279
      if (!passThrough) {
1✔
280
        pendingRunnables.add(runnable);
1✔
281
        return;
1✔
282
      }
283
    }
1✔
284
    runnable.run();
1✔
285
  }
1✔
286

287
  /**
288
   * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
289
   * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
290
   * should not be held when calling this method.
291
   */
292
  private void drainPendingCalls() {
293
    assert realCall != null;
1✔
294
    assert !passThrough;
1✔
295
    List<Runnable> toRun = new ArrayList<>();
1✔
296
    DelayedListener<RespT> delayedListener ;
297
    while (true) {
298
      synchronized (this) {
1✔
299
        if (pendingRunnables.isEmpty()) {
1✔
300
          pendingRunnables = null;
1✔
301
          passThrough = true;
1✔
302
          delayedListener = this.delayedListener;
1✔
303
          break;
1✔
304
        }
305
        // Since there were pendingCalls, we need to process them. To maintain ordering we can't set
306
        // passThrough=true until we run all pendingCalls, but new Runnables may be added after we
307
        // drop the lock. So we will have to re-check pendingCalls.
308
        List<Runnable> tmp = toRun;
1✔
309
        toRun = pendingRunnables;
1✔
310
        pendingRunnables = tmp;
1✔
311
      }
1✔
312
      for (Runnable runnable : toRun) {
1✔
313
        // Must not call transport while lock is held to prevent deadlocks.
314
        // TODO(ejona): exception handling
315
        runnable.run();
1✔
316
      }
1✔
317
      toRun.clear();
1✔
318
    }
319
    if (delayedListener != null) {
1✔
320
      final DelayedListener<RespT> listener = delayedListener;
1✔
321
      class DrainListenerRunnable extends ContextRunnable {
322
        DrainListenerRunnable() {
1✔
323
          super(context);
1✔
324
        }
1✔
325

326
        @Override
327
        public void runInContext() {
328
          listener.drainPendingCallbacks();
1✔
329
        }
1✔
330
      }
331

332
      callExecutor.execute(new DrainListenerRunnable());
1✔
333
    }
334
  }
1✔
335

336
  @GuardedBy("this")
337
  private void setRealCall(ClientCall<ReqT, RespT> realCall) {
338
    checkState(this.realCall == null, "realCall already set to %s", this.realCall);
1✔
339
    if (initialDeadlineMonitor != null) {
1✔
340
      initialDeadlineMonitor.cancel(false);
1✔
341
    }
342
    this.realCall = realCall;
1✔
343
  }
1✔
344

345
  @VisibleForTesting
346
  final ClientCall<ReqT, RespT> getRealCall() {
347
    return realCall;
×
348
  }
349

350
  @Override
351
  public final void sendMessage(final ReqT message) {
352
    if (passThrough) {
1✔
353
      realCall.sendMessage(message);
1✔
354
    } else {
355
      delayOrExecute(new Runnable() {
1✔
356
        @Override
357
        public void run() {
358
          realCall.sendMessage(message);
1✔
359
        }
1✔
360
      });
361
    }
362
  }
1✔
363

364
  @Override
365
  public final void setMessageCompression(final boolean enable) {
366
    if (passThrough) {
1✔
367
      realCall.setMessageCompression(enable);
1✔
368
    } else {
369
      delayOrExecute(new Runnable() {
1✔
370
        @Override
371
        public void run() {
372
          realCall.setMessageCompression(enable);
1✔
373
        }
1✔
374
      });
375
    }
376
  }
1✔
377

378
  @Override
379
  public final void request(final int numMessages) {
380
    if (passThrough) {
1✔
381
      realCall.request(numMessages);
1✔
382
    } else {
383
      delayOrExecute(new Runnable() {
1✔
384
        @Override
385
        public void run() {
386
          realCall.request(numMessages);
1✔
387
        }
1✔
388
      });
389
    }
390
  }
1✔
391

392
  @Override
393
  public final void halfClose() {
394
    delayOrExecute(new Runnable() {
1✔
395
      @Override
396
      public void run() {
397
        realCall.halfClose();
1✔
398
      }
1✔
399
    });
400
  }
1✔
401

402
  @Override
403
  public final boolean isReady() {
404
    if (passThrough) {
1✔
405
      return realCall.isReady();
1✔
406
    } else {
407
      return false;
×
408
    }
409
  }
410

411
  @Override
412
  public final Attributes getAttributes() {
413
    ClientCall<ReqT, RespT> savedRealCall;
414
    synchronized (this) {
1✔
415
      savedRealCall = realCall;
1✔
416
    }
1✔
417
    if (savedRealCall != null) {
1✔
418
      return savedRealCall.getAttributes();
1✔
419
    } else {
420
      return Attributes.EMPTY;
×
421
    }
422
  }
423

424
  @Override
425
  public String toString() {
426
    return MoreObjects.toStringHelper(this)
×
427
        .add("realCall", realCall)
×
428
        .toString();
×
429
  }
430

431
  private final class CloseListenerRunnable extends ContextRunnable {
432
    final Listener<RespT> listener;
433
    final Status status;
434

435
    CloseListenerRunnable(Listener<RespT> listener, Status status) {
1✔
436
      super(context);
1✔
437
      this.listener = listener;
1✔
438
      this.status = status;
1✔
439
    }
1✔
440

441
    @Override
442
    public void runInContext() {
443
      listener.onClose(status, new Metadata());
1✔
444
    }
1✔
445
  }
446

447
  private static final class DelayedListener<RespT> extends Listener<RespT> {
1✔
448
    private final Listener<RespT> realListener;
449
    private volatile boolean passThrough;
450
    @GuardedBy("this")
1✔
451
    private List<Runnable> pendingCallbacks = new ArrayList<>();
452

453
    public DelayedListener(Listener<RespT> listener) {
1✔
454
      this.realListener = listener;
1✔
455
    }
1✔
456

457
    private void delayOrExecute(Runnable runnable) {
458
      synchronized (this) {
1✔
459
        if (!passThrough) {
1✔
460
          pendingCallbacks.add(runnable);
1✔
461
          return;
1✔
462
        }
463
      }
1✔
464
      runnable.run();
1✔
465
    }
1✔
466

467
    @Override
468
    public void onHeaders(final Metadata headers) {
469
      if (passThrough) {
1✔
470
        realListener.onHeaders(headers);
1✔
471
      } else {
472
        delayOrExecute(new Runnable() {
×
473
          @Override
474
          public void run() {
475
            realListener.onHeaders(headers);
×
476
          }
×
477
        });
478
      }
479
    }
1✔
480

481
    @Override
482
    public void onMessage(final RespT message) {
483
      if (passThrough) {
1✔
484
        realListener.onMessage(message);
1✔
485
      } else {
486
        delayOrExecute(new Runnable() {
×
487
          @Override
488
          public void run() {
489
            realListener.onMessage(message);
×
490
          }
×
491
        });
492
      }
493
    }
1✔
494

495
    @Override
496
    public void onClose(final Status status, final Metadata trailers) {
497
      delayOrExecute(new Runnable() {
1✔
498
        @Override
499
        public void run() {
500
          realListener.onClose(status, trailers);
1✔
501
        }
1✔
502
      });
503
    }
1✔
504

505
    @Override
506
    public void onReady() {
507
      if (passThrough) {
1✔
508
        realListener.onReady();
1✔
509
      } else {
510
        delayOrExecute(new Runnable() {
1✔
511
          @Override
512
          public void run() {
513
            realListener.onReady();
1✔
514
          }
1✔
515
        });
516
      }
517
    }
1✔
518

519
    void drainPendingCallbacks() {
520
      assert !passThrough;
1✔
521
      List<Runnable> toRun = new ArrayList<>();
1✔
522
      while (true) {
523
        synchronized (this) {
1✔
524
          if (pendingCallbacks.isEmpty()) {
1✔
525
            pendingCallbacks = null;
1✔
526
            passThrough = true;
1✔
527
            break;
1✔
528
          }
529
          // Since there were pendingCallbacks, we need to process them. To maintain ordering we
530
          // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
531
          // added after we drop the lock. So we will have to re-check pendingCallbacks.
532
          List<Runnable> tmp = toRun;
1✔
533
          toRun = pendingCallbacks;
1✔
534
          pendingCallbacks = tmp;
1✔
535
        }
1✔
536
        for (Runnable runnable : toRun) {
1✔
537
          // Avoid calling listener while lock is held to prevent deadlocks.
538
          // TODO(ejona): exception handling
539
          runnable.run();
1✔
540
        }
1✔
541
        toRun.clear();
1✔
542
      }
543
    }
1✔
544
  }
545

546
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
547
    @Override
548
    public void start(Listener<Object> responseListener, Metadata headers) {}
1✔
549

550
    @Override
551
    public void request(int numMessages) {}
1✔
552

553
    @Override
554
    public void cancel(String message, Throwable cause) {}
1✔
555

556
    @Override
557
    public void halfClose() {}
1✔
558

559
    @Override
560
    public void sendMessage(Object message) {}
1✔
561

562
    // Always returns {@code false}, since this is only used when the startup of the call fails.
563
    @Override
564
    public boolean isReady() {
565
      return false;
×
566
    }
567
  };
568
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc