• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19679

05 Feb 2025 07:10PM CUT coverage: 88.584% (+0.02%) from 88.566%
#19679

push

github

web-flow
use gradle java-platform in grpc-bom; Fixes #5530 (#11875)

* use gradle java-platform in grpc-bom; Fixes #5530

* fix withXml

* explicitly exclude grpc-compiler

33762 of 38113 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.07
/../core/src/main/java/io/grpc/internal/DelayedClientCall.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static java.util.concurrent.TimeUnit.NANOSECONDS;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.errorprone.annotations.concurrent.GuardedBy;
26
import io.grpc.Attributes;
27
import io.grpc.ClientCall;
28
import io.grpc.Context;
29
import io.grpc.Deadline;
30
import io.grpc.Metadata;
31
import io.grpc.Status;
32
import java.util.ArrayList;
33
import java.util.List;
34
import java.util.Locale;
35
import java.util.concurrent.Executor;
36
import java.util.concurrent.ScheduledExecutorService;
37
import java.util.concurrent.ScheduledFuture;
38
import java.util.concurrent.TimeUnit;
39
import java.util.logging.Level;
40
import java.util.logging.Logger;
41
import javax.annotation.Nullable;
42

43
/**
44
 * A call that queues requests before a real call is ready to be delegated to.
45
 *
46
 * <p>{@code ClientCall} itself doesn't require thread-safety. However, the state of {@code
47
 * DelayedCall} may be internally altered by different threads, thus internal synchronization is
48
 * necessary.
49
 */
50
public class DelayedClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
51
  private static final Logger logger = Logger.getLogger(DelayedClientCall.class.getName());
1✔
52
  /**
53
   * A timer to monitor the initial deadline. The timer must be cancelled on transition to the real
54
   * call.
55
   */
56
  @Nullable
57
  private final ScheduledFuture<?> initialDeadlineMonitor;
58
  private final Executor callExecutor;
59
  private final Context context;
60
  /** {@code true} once realCall is valid and all pending calls have been drained. */
61
  private volatile boolean passThrough;
62
  /**
63
   * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate
64
   * order, but also used if an error occurs before {@code realCall} is set.
65
   */
66
  private Listener<RespT> listener;
67
  // Must hold {@code this} lock when setting.
68
  private ClientCall<ReqT, RespT> realCall;
69
  @GuardedBy("this")
70
  private Status error;
71
  @GuardedBy("this")
1✔
72
  private List<Runnable> pendingRunnables = new ArrayList<>();
73
  @GuardedBy("this")
74
  private DelayedListener<RespT> delayedListener;
75

76
  protected DelayedClientCall(
77
      Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
1✔
78
    this.callExecutor = checkNotNull(callExecutor, "callExecutor");
1✔
79
    checkNotNull(scheduler, "scheduler");
1✔
80
    context = Context.current();
1✔
81
    initialDeadlineMonitor = scheduleDeadlineIfNeeded(scheduler, deadline);
1✔
82
  }
1✔
83

84
  // If one argument is null, consider the other the "Before"
85
  private boolean isAbeforeB(@Nullable Deadline a, @Nullable Deadline b) {
86
    if (b == null) {
1✔
87
      return true;
×
88
    } else if (a == null) {
1✔
89
      return false;
1✔
90
    }
91

92
    return a.isBefore(b);
×
93
  }
94

95
  @Nullable
96
  private ScheduledFuture<?> scheduleDeadlineIfNeeded(
97
      ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
98
    Deadline contextDeadline = context.getDeadline();
1✔
99
    if (deadline == null && contextDeadline == null) {
1✔
100
      return null;
1✔
101
    }
102
    long remainingNanos = Long.MAX_VALUE;
1✔
103
    if (deadline != null) {
1✔
104
      remainingNanos = deadline.timeRemaining(NANOSECONDS);
1✔
105
    }
106

107
    if (contextDeadline != null && contextDeadline.timeRemaining(NANOSECONDS) < remainingNanos) {
1✔
108
      remainingNanos = contextDeadline.timeRemaining(NANOSECONDS);
×
109
      if (logger.isLoggable(Level.FINE)) {
×
110
        StringBuilder builder =
×
111
            new StringBuilder(
112
                String.format(
×
113
                    Locale.US,
114
                    "Call timeout set to '%d' ns, due to context deadline.", remainingNanos));
×
115
        if (deadline == null) {
×
116
          builder.append(" Explicit call timeout was not set.");
×
117
        } else {
118
          long callTimeout = deadline.timeRemaining(TimeUnit.NANOSECONDS);
×
119
          builder.append(String.format(
×
120
              Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
×
121
        }
122
        logger.fine(builder.toString());
×
123
      }
124
    }
125

126
    long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
127
    long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
128
    final StringBuilder buf = new StringBuilder();
1✔
129
    String deadlineName = isAbeforeB(contextDeadline, deadline) ? "Context" : "CallOptions";
1✔
130
    if (remainingNanos < 0) {
1✔
131
      buf.append("ClientCall started after ");
1✔
132
      buf.append(deadlineName);
1✔
133
      buf.append(" deadline was exceeded. Deadline has been exceeded for ");
1✔
134
    } else {
135
      buf.append("Deadline ");
1✔
136
      buf.append(deadlineName);
1✔
137
      buf.append(" will be exceeded in ");
1✔
138
    }
139
    buf.append(seconds);
1✔
140
    buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
141
    buf.append("s. ");
1✔
142

143
    /* Cancels the call if deadline exceeded prior to the real call being set. */
144
    class DeadlineExceededRunnable implements Runnable {
1✔
145
      @Override
146
      public void run() {
147
        cancel(
1✔
148
            Status.DEADLINE_EXCEEDED.withDescription(buf.toString()),
1✔
149
            // We should not cancel the call if the realCall is set because there could be a
150
            // race between cancel() and realCall.start(). The realCall will handle deadline by
151
            // itself.
152
            /* onlyCancelPendingCall= */ true);
153
      }
1✔
154
    }
155

156
    return scheduler.schedule(new DeadlineExceededRunnable(), remainingNanos, NANOSECONDS);
1✔
157
  }
158

159
  /**
160
   * Transfers all pending and future requests and mutations to the given call.
161
   *
162
   * <p>No-op if either this method or {@link #cancel} have already been called.
163
   */
164
  // When this method returns, passThrough is guaranteed to be true
165
  public final Runnable setCall(ClientCall<ReqT, RespT> call) {
166
    synchronized (this) {
1✔
167
      // If realCall != null, then either setCall() or cancel() has been called.
168
      if (realCall != null) {
1✔
169
        return null;
1✔
170
      }
171
      setRealCall(checkNotNull(call, "call"));
1✔
172
    }
1✔
173
    return new ContextRunnable(context) {
1✔
174
      @Override
175
      public void runInContext() {
176
        drainPendingCalls();
1✔
177
      }
1✔
178
    };
179
  }
180

181
  @Override
182
  public final void start(Listener<RespT> listener, final Metadata headers) {
183
    checkState(this.listener == null, "already started");
1✔
184
    Status savedError;
185
    boolean savedPassThrough;
186
    synchronized (this) {
1✔
187
      this.listener = checkNotNull(listener, "listener");
1✔
188
      // If error != null, then cancel() has been called and was unable to close the listener
189
      savedError = error;
1✔
190
      savedPassThrough = passThrough;
1✔
191
      if (!savedPassThrough) {
1✔
192
        listener = delayedListener = new DelayedListener<>(listener);
1✔
193
      }
194
    }
1✔
195
    if (savedError != null) {
1✔
196
      callExecutor.execute(new CloseListenerRunnable(listener, savedError));
×
197
      return;
×
198
    }
199
    if (savedPassThrough) {
1✔
200
      realCall.start(listener, headers);
1✔
201
    } else {
202
      final Listener<RespT> finalListener = listener;
1✔
203
      delayOrExecute(new Runnable() {
1✔
204
        @Override
205
        public void run() {
206
          realCall.start(finalListener, headers);
1✔
207
        }
1✔
208
      });
209
    }
210
  }
1✔
211

212
  // When this method returns, passThrough is guaranteed to be true
213
  @Override
214
  public final void cancel(@Nullable final String message, @Nullable final Throwable cause) {
215
    Status status = Status.CANCELLED;
1✔
216
    if (message != null) {
1✔
217
      status = status.withDescription(message);
1✔
218
    } else {
219
      status = status.withDescription("Call cancelled without message");
1✔
220
    }
221
    if (cause != null) {
1✔
222
      status = status.withCause(cause);
1✔
223
    }
224
    cancel(status, false);
1✔
225
  }
1✔
226

227
  /**
228
   * Cancels the call unless {@code realCall} is set and {@code onlyCancelPendingCall} is true.
229
   */
230
  private void cancel(final Status status, boolean onlyCancelPendingCall) {
231
    boolean delegateToRealCall = true;
1✔
232
    Listener<RespT> listenerToClose = null;
1✔
233
    synchronized (this) {
1✔
234
      // If realCall != null, then either setCall() or cancel() has been called
235
      if (realCall == null) {
1✔
236
        @SuppressWarnings("unchecked")
237
        ClientCall<ReqT, RespT> noopCall = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
238
        setRealCall(noopCall);
1✔
239
        delegateToRealCall = false;
1✔
240
        // If listener == null, then start() will later call listener with 'error'
241
        listenerToClose = listener;
1✔
242
        error = status;
1✔
243
      } else if (onlyCancelPendingCall) {
1✔
244
        return;
×
245
      }
246
    }
1✔
247
    if (delegateToRealCall) {
1✔
248
      delayOrExecute(new Runnable() {
1✔
249
        @Override
250
        public void run() {
251
          realCall.cancel(status.getDescription(), status.getCause());
1✔
252
        }
1✔
253
      });
254
    } else {
255
      if (listenerToClose != null) {
1✔
256
        callExecutor.execute(new CloseListenerRunnable(listenerToClose, status));
1✔
257
      }
258
      drainPendingCalls();
1✔
259
    }
260
    callCancelled();
1✔
261
  }
1✔
262

263
  protected void callCancelled() {
264
  }
1✔
265

266
  private void delayOrExecute(Runnable runnable) {
267
    synchronized (this) {
1✔
268
      if (!passThrough) {
1✔
269
        pendingRunnables.add(runnable);
1✔
270
        return;
1✔
271
      }
272
    }
1✔
273
    runnable.run();
1✔
274
  }
1✔
275

276
  /**
277
   * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
278
   * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
279
   * should not be held when calling this method.
280
   */
281
  private void drainPendingCalls() {
282
    assert realCall != null;
1✔
283
    assert !passThrough;
1✔
284
    List<Runnable> toRun = new ArrayList<>();
1✔
285
    DelayedListener<RespT> delayedListener ;
286
    while (true) {
287
      synchronized (this) {
1✔
288
        if (pendingRunnables.isEmpty()) {
1✔
289
          pendingRunnables = null;
1✔
290
          passThrough = true;
1✔
291
          delayedListener = this.delayedListener;
1✔
292
          break;
1✔
293
        }
294
        // Since there were pendingCalls, we need to process them. To maintain ordering we can't set
295
        // passThrough=true until we run all pendingCalls, but new Runnables may be added after we
296
        // drop the lock. So we will have to re-check pendingCalls.
297
        List<Runnable> tmp = toRun;
1✔
298
        toRun = pendingRunnables;
1✔
299
        pendingRunnables = tmp;
1✔
300
      }
1✔
301
      for (Runnable runnable : toRun) {
1✔
302
        // Must not call transport while lock is held to prevent deadlocks.
303
        // TODO(ejona): exception handling
304
        runnable.run();
1✔
305
      }
1✔
306
      toRun.clear();
1✔
307
    }
308
    if (delayedListener != null) {
1✔
309
      final DelayedListener<RespT> listener = delayedListener;
1✔
310
      class DrainListenerRunnable extends ContextRunnable {
311
        DrainListenerRunnable() {
1✔
312
          super(context);
1✔
313
        }
1✔
314

315
        @Override
316
        public void runInContext() {
317
          listener.drainPendingCallbacks();
1✔
318
        }
1✔
319
      }
320

321
      callExecutor.execute(new DrainListenerRunnable());
1✔
322
    }
323
  }
1✔
324

325
  @GuardedBy("this")
326
  private void setRealCall(ClientCall<ReqT, RespT> realCall) {
327
    checkState(this.realCall == null, "realCall already set to %s", this.realCall);
1✔
328
    if (initialDeadlineMonitor != null) {
1✔
329
      initialDeadlineMonitor.cancel(false);
1✔
330
    }
331
    this.realCall = realCall;
1✔
332
  }
1✔
333

334
  @VisibleForTesting
335
  final ClientCall<ReqT, RespT> getRealCall() {
336
    return realCall;
×
337
  }
338

339
  @Override
340
  public final void sendMessage(final ReqT message) {
341
    if (passThrough) {
1✔
342
      realCall.sendMessage(message);
1✔
343
    } else {
344
      delayOrExecute(new Runnable() {
1✔
345
        @Override
346
        public void run() {
347
          realCall.sendMessage(message);
1✔
348
        }
1✔
349
      });
350
    }
351
  }
1✔
352

353
  @Override
354
  public final void setMessageCompression(final boolean enable) {
355
    if (passThrough) {
1✔
356
      realCall.setMessageCompression(enable);
1✔
357
    } else {
358
      delayOrExecute(new Runnable() {
1✔
359
        @Override
360
        public void run() {
361
          realCall.setMessageCompression(enable);
1✔
362
        }
1✔
363
      });
364
    }
365
  }
1✔
366

367
  @Override
368
  public final void request(final int numMessages) {
369
    if (passThrough) {
1✔
370
      realCall.request(numMessages);
1✔
371
    } else {
372
      delayOrExecute(new Runnable() {
1✔
373
        @Override
374
        public void run() {
375
          realCall.request(numMessages);
1✔
376
        }
1✔
377
      });
378
    }
379
  }
1✔
380

381
  @Override
382
  public final void halfClose() {
383
    delayOrExecute(new Runnable() {
1✔
384
      @Override
385
      public void run() {
386
        realCall.halfClose();
1✔
387
      }
1✔
388
    });
389
  }
1✔
390

391
  @Override
392
  public final boolean isReady() {
393
    if (passThrough) {
1✔
394
      return realCall.isReady();
1✔
395
    } else {
396
      return false;
×
397
    }
398
  }
399

400
  @Override
401
  public final Attributes getAttributes() {
402
    ClientCall<ReqT, RespT> savedRealCall;
403
    synchronized (this) {
1✔
404
      savedRealCall = realCall;
1✔
405
    }
1✔
406
    if (savedRealCall != null) {
1✔
407
      return savedRealCall.getAttributes();
1✔
408
    } else {
409
      return Attributes.EMPTY;
×
410
    }
411
  }
412

413
  @Override
414
  public String toString() {
415
    return MoreObjects.toStringHelper(this)
×
416
        .add("realCall", realCall)
×
417
        .toString();
×
418
  }
419

420
  private final class CloseListenerRunnable extends ContextRunnable {
421
    final Listener<RespT> listener;
422
    final Status status;
423

424
    CloseListenerRunnable(Listener<RespT> listener, Status status) {
1✔
425
      super(context);
1✔
426
      this.listener = listener;
1✔
427
      this.status = status;
1✔
428
    }
1✔
429

430
    @Override
431
    public void runInContext() {
432
      listener.onClose(status, new Metadata());
1✔
433
    }
1✔
434
  }
435

436
  private static final class DelayedListener<RespT> extends Listener<RespT> {
1✔
437
    private final Listener<RespT> realListener;
438
    private volatile boolean passThrough;
439
    @GuardedBy("this")
1✔
440
    private List<Runnable> pendingCallbacks = new ArrayList<>();
441

442
    public DelayedListener(Listener<RespT> listener) {
1✔
443
      this.realListener = listener;
1✔
444
    }
1✔
445

446
    private void delayOrExecute(Runnable runnable) {
447
      synchronized (this) {
1✔
448
        if (!passThrough) {
1✔
449
          pendingCallbacks.add(runnable);
1✔
450
          return;
1✔
451
        }
452
      }
1✔
453
      runnable.run();
1✔
454
    }
1✔
455

456
    @Override
457
    public void onHeaders(final Metadata headers) {
458
      if (passThrough) {
1✔
459
        realListener.onHeaders(headers);
1✔
460
      } else {
461
        delayOrExecute(new Runnable() {
×
462
          @Override
463
          public void run() {
464
            realListener.onHeaders(headers);
×
465
          }
×
466
        });
467
      }
468
    }
1✔
469

470
    @Override
471
    public void onMessage(final RespT message) {
472
      if (passThrough) {
1✔
473
        realListener.onMessage(message);
1✔
474
      } else {
475
        delayOrExecute(new Runnable() {
×
476
          @Override
477
          public void run() {
478
            realListener.onMessage(message);
×
479
          }
×
480
        });
481
      }
482
    }
1✔
483

484
    @Override
485
    public void onClose(final Status status, final Metadata trailers) {
486
      delayOrExecute(new Runnable() {
1✔
487
        @Override
488
        public void run() {
489
          realListener.onClose(status, trailers);
1✔
490
        }
1✔
491
      });
492
    }
1✔
493

494
    @Override
495
    public void onReady() {
496
      if (passThrough) {
1✔
497
        realListener.onReady();
1✔
498
      } else {
499
        delayOrExecute(new Runnable() {
1✔
500
          @Override
501
          public void run() {
502
            realListener.onReady();
1✔
503
          }
1✔
504
        });
505
      }
506
    }
1✔
507

508
    void drainPendingCallbacks() {
509
      assert !passThrough;
1✔
510
      List<Runnable> toRun = new ArrayList<>();
1✔
511
      while (true) {
512
        synchronized (this) {
1✔
513
          if (pendingCallbacks.isEmpty()) {
1✔
514
            pendingCallbacks = null;
1✔
515
            passThrough = true;
1✔
516
            break;
1✔
517
          }
518
          // Since there were pendingCallbacks, we need to process them. To maintain ordering we
519
          // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
520
          // added after we drop the lock. So we will have to re-check pendingCallbacks.
521
          List<Runnable> tmp = toRun;
1✔
522
          toRun = pendingCallbacks;
1✔
523
          pendingCallbacks = tmp;
1✔
524
        }
1✔
525
        for (Runnable runnable : toRun) {
1✔
526
          // Avoid calling listener while lock is held to prevent deadlocks.
527
          // TODO(ejona): exception handling
528
          runnable.run();
1✔
529
        }
1✔
530
        toRun.clear();
1✔
531
      }
532
    }
1✔
533
  }
534

535
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
536
    @Override
537
    public void start(Listener<Object> responseListener, Metadata headers) {}
1✔
538

539
    @Override
540
    public void request(int numMessages) {}
1✔
541

542
    @Override
543
    public void cancel(String message, Throwable cause) {}
1✔
544

545
    @Override
546
    public void halfClose() {}
1✔
547

548
    @Override
549
    public void sendMessage(Object message) {}
1✔
550

551
    // Always returns {@code false}, since this is only used when the startup of the call fails.
552
    @Override
553
    public boolean isReady() {
554
      return false;
×
555
    }
556
  };
557
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc