• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19962

28 Aug 2025 03:57AM UTC coverage: 88.537% (-0.02%) from 88.554%
#19962

push

github

web-flow
servlet: extract ServletServerStream.serializeHeaders() method (#12299)

34680 of 39170 relevant lines covered (88.54%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.65
/../core/src/main/java/io/grpc/internal/DelayedClientCall.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static java.util.concurrent.TimeUnit.NANOSECONDS;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.errorprone.annotations.concurrent.GuardedBy;
26
import io.grpc.Attributes;
27
import io.grpc.ClientCall;
28
import io.grpc.Context;
29
import io.grpc.Deadline;
30
import io.grpc.Metadata;
31
import io.grpc.Status;
32
import java.util.ArrayList;
33
import java.util.List;
34
import java.util.Locale;
35
import java.util.concurrent.Executor;
36
import java.util.concurrent.ScheduledExecutorService;
37
import java.util.concurrent.ScheduledFuture;
38
import java.util.concurrent.TimeUnit;
39
import java.util.logging.Level;
40
import java.util.logging.Logger;
41
import javax.annotation.Nullable;
42

43
/**
44
 * A call that queues requests before a real call is ready to be delegated to.
45
 *
46
 * <p>{@code ClientCall} itself doesn't require thread-safety. However, the state of {@code
47
 * DelayedCall} may be internally altered by different threads, thus internal synchronization is
48
 * necessary.
49
 */
50
public class DelayedClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
51
  private static final Logger logger = Logger.getLogger(DelayedClientCall.class.getName());
1✔
52
  /**
53
   * A timer to monitor the initial deadline. The timer must be cancelled on transition to the real
54
   * call.
55
   */
56
  @Nullable
57
  private final ScheduledFuture<?> initialDeadlineMonitor;
58
  private final Executor callExecutor;
59
  private final Context context;
60
  /** {@code true} once realCall is valid and all pending calls have been drained. */
61
  private volatile boolean passThrough;
62
  /**
63
   * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate
64
   * order, but also used if an error occurs before {@code realCall} is set.
65
   */
66
  private Listener<RespT> listener;
67
  // Must hold {@code this} lock when setting.
68
  private ClientCall<ReqT, RespT> realCall;
69
  @GuardedBy("this")
70
  private Status error;
71
  @GuardedBy("this")
1✔
72
  private List<Runnable> pendingRunnables = new ArrayList<>();
73
  @GuardedBy("this")
74
  private DelayedListener<RespT> delayedListener;
75

76
  protected DelayedClientCall(
77
      Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
1✔
78
    this.callExecutor = checkNotNull(callExecutor, "callExecutor");
1✔
79
    checkNotNull(scheduler, "scheduler");
1✔
80
    context = Context.current();
1✔
81
    initialDeadlineMonitor = scheduleDeadlineIfNeeded(scheduler, deadline);
1✔
82
  }
1✔
83

84
  // If one argument is null, consider the other the "Before"
85
  private boolean isAbeforeB(@Nullable Deadline a, @Nullable Deadline b) {
86
    if (b == null) {
1✔
87
      return true;
1✔
88
    } else if (a == null) {
×
89
      return false;
×
90
    }
91

92
    return a.isBefore(b);
×
93
  }
94

95
  @Nullable
96
  private ScheduledFuture<?> scheduleDeadlineIfNeeded(
97
      ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
98
    Deadline contextDeadline = context.getDeadline();
1✔
99
    String deadlineName;
100
    long remainingNanos;
101
    if (deadline != null && isAbeforeB(deadline, contextDeadline)) {
1✔
102
      deadlineName = "CallOptions";
1✔
103
      remainingNanos = deadline.timeRemaining(NANOSECONDS);
1✔
104
    } else if (contextDeadline != null) {
1✔
105
      deadlineName = "Context";
×
106
      remainingNanos = contextDeadline.timeRemaining(NANOSECONDS);
×
107
      if (logger.isLoggable(Level.FINE)) {
×
108
        StringBuilder builder =
×
109
            new StringBuilder(
110
                String.format(
×
111
                    Locale.US,
112
                    "Call timeout set to '%d' ns, due to context deadline.", remainingNanos));
×
113
        if (deadline == null) {
×
114
          builder.append(" Explicit call timeout was not set.");
×
115
        } else {
116
          long callTimeout = deadline.timeRemaining(TimeUnit.NANOSECONDS);
×
117
          builder.append(String.format(
×
118
              Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
×
119
        }
120
        logger.fine(builder.toString());
×
121
      }
×
122
    } else {
123
      return null;
1✔
124
    }
125

126
    /* Cancels the call if deadline exceeded prior to the real call being set. */
127
    class DeadlineExceededRunnable implements Runnable {
1✔
128
      @Override
129
      public void run() {
130
        long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
131
        long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
132
        StringBuilder buf = new StringBuilder();
1✔
133
        if (remainingNanos < 0) {
1✔
134
          buf.append("ClientCall started after ");
1✔
135
          buf.append(deadlineName);
1✔
136
          buf.append(" deadline was exceeded. Deadline has been exceeded for ");
1✔
137
        } else {
138
          buf.append("Deadline ");
1✔
139
          buf.append(deadlineName);
1✔
140
          buf.append(" was exceeded after ");
1✔
141
        }
142
        buf.append(seconds);
1✔
143
        buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
144
        buf.append("s");
1✔
145
        cancel(
1✔
146
            Status.DEADLINE_EXCEEDED.withDescription(buf.toString()),
1✔
147
            // We should not cancel the call if the realCall is set because there could be a
148
            // race between cancel() and realCall.start(). The realCall will handle deadline by
149
            // itself.
150
            /* onlyCancelPendingCall= */ true);
151
      }
1✔
152
    }
153

154
    return scheduler.schedule(new DeadlineExceededRunnable(), remainingNanos, NANOSECONDS);
1✔
155
  }
156

157
  /**
158
   * Transfers all pending and future requests and mutations to the given call.
159
   *
160
   * <p>No-op if either this method or {@link #cancel} have already been called.
161
   */
162
  // When this method returns, passThrough is guaranteed to be true
163
  public final Runnable setCall(ClientCall<ReqT, RespT> call) {
164
    synchronized (this) {
1✔
165
      // If realCall != null, then either setCall() or cancel() has been called.
166
      if (realCall != null) {
1✔
167
        return null;
1✔
168
      }
169
      setRealCall(checkNotNull(call, "call"));
1✔
170
    }
1✔
171
    return new ContextRunnable(context) {
1✔
172
      @Override
173
      public void runInContext() {
174
        drainPendingCalls();
1✔
175
      }
1✔
176
    };
177
  }
178

179
  @Override
180
  public final void start(Listener<RespT> listener, final Metadata headers) {
181
    checkState(this.listener == null, "already started");
1✔
182
    Status savedError;
183
    boolean savedPassThrough;
184
    synchronized (this) {
1✔
185
      this.listener = checkNotNull(listener, "listener");
1✔
186
      // If error != null, then cancel() has been called and was unable to close the listener
187
      savedError = error;
1✔
188
      savedPassThrough = passThrough;
1✔
189
      if (!savedPassThrough) {
1✔
190
        listener = delayedListener = new DelayedListener<>(listener);
1✔
191
      }
192
    }
1✔
193
    if (savedError != null) {
1✔
194
      callExecutor.execute(new CloseListenerRunnable(listener, savedError));
1✔
195
      return;
1✔
196
    }
197
    if (savedPassThrough) {
1✔
198
      realCall.start(listener, headers);
1✔
199
    } else {
200
      final Listener<RespT> finalListener = listener;
1✔
201
      delayOrExecute(new Runnable() {
1✔
202
        @Override
203
        public void run() {
204
          realCall.start(finalListener, headers);
1✔
205
        }
1✔
206
      });
207
    }
208
  }
1✔
209

210
  // When this method returns, passThrough is guaranteed to be true
211
  @Override
212
  public final void cancel(@Nullable final String message, @Nullable final Throwable cause) {
213
    Status status = Status.CANCELLED;
1✔
214
    if (message != null) {
1✔
215
      status = status.withDescription(message);
1✔
216
    } else {
217
      status = status.withDescription("Call cancelled without message");
1✔
218
    }
219
    if (cause != null) {
1✔
220
      status = status.withCause(cause);
1✔
221
    }
222
    cancel(status, false);
1✔
223
  }
1✔
224

225
  /**
226
   * Cancels the call unless {@code realCall} is set and {@code onlyCancelPendingCall} is true.
227
   */
228
  private void cancel(final Status status, boolean onlyCancelPendingCall) {
229
    boolean delegateToRealCall = true;
1✔
230
    Listener<RespT> listenerToClose = null;
1✔
231
    synchronized (this) {
1✔
232
      // If realCall != null, then either setCall() or cancel() has been called
233
      if (realCall == null) {
1✔
234
        @SuppressWarnings("unchecked")
235
        ClientCall<ReqT, RespT> noopCall = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
236
        setRealCall(noopCall);
1✔
237
        delegateToRealCall = false;
1✔
238
        // If listener == null, then start() will later call listener with 'error'
239
        listenerToClose = listener;
1✔
240
        error = status;
1✔
241
      } else if (onlyCancelPendingCall) {
1✔
242
        return;
×
243
      }
244
    }
1✔
245
    if (delegateToRealCall) {
1✔
246
      delayOrExecute(new Runnable() {
1✔
247
        @Override
248
        public void run() {
249
          realCall.cancel(status.getDescription(), status.getCause());
1✔
250
        }
1✔
251
      });
252
    } else {
253
      if (listenerToClose != null) {
1✔
254
        callExecutor.execute(new CloseListenerRunnable(listenerToClose, status));
1✔
255
      }
256
      drainPendingCalls();
1✔
257
    }
258
    callCancelled();
1✔
259
  }
1✔
260

261
  protected void callCancelled() {
262
  }
1✔
263

264
  private void delayOrExecute(Runnable runnable) {
265
    synchronized (this) {
1✔
266
      if (!passThrough) {
1✔
267
        pendingRunnables.add(runnable);
1✔
268
        return;
1✔
269
      }
270
    }
1✔
271
    runnable.run();
1✔
272
  }
1✔
273

274
  /**
275
   * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
276
   * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
277
   * should not be held when calling this method.
278
   */
279
  private void drainPendingCalls() {
280
    assert realCall != null;
1✔
281
    assert !passThrough;
1✔
282
    List<Runnable> toRun = new ArrayList<>();
1✔
283
    DelayedListener<RespT> delayedListener ;
284
    while (true) {
285
      synchronized (this) {
1✔
286
        if (pendingRunnables.isEmpty()) {
1✔
287
          pendingRunnables = null;
1✔
288
          passThrough = true;
1✔
289
          delayedListener = this.delayedListener;
1✔
290
          break;
1✔
291
        }
292
        // Since there were pendingCalls, we need to process them. To maintain ordering we can't set
293
        // passThrough=true until we run all pendingCalls, but new Runnables may be added after we
294
        // drop the lock. So we will have to re-check pendingCalls.
295
        List<Runnable> tmp = toRun;
1✔
296
        toRun = pendingRunnables;
1✔
297
        pendingRunnables = tmp;
1✔
298
      }
1✔
299
      for (Runnable runnable : toRun) {
1✔
300
        // Must not call transport while lock is held to prevent deadlocks.
301
        // TODO(ejona): exception handling
302
        runnable.run();
1✔
303
      }
1✔
304
      toRun.clear();
1✔
305
    }
306
    if (delayedListener != null) {
1✔
307
      final DelayedListener<RespT> listener = delayedListener;
1✔
308
      class DrainListenerRunnable extends ContextRunnable {
309
        DrainListenerRunnable() {
1✔
310
          super(context);
1✔
311
        }
1✔
312

313
        @Override
314
        public void runInContext() {
315
          listener.drainPendingCallbacks();
1✔
316
        }
1✔
317
      }
318

319
      callExecutor.execute(new DrainListenerRunnable());
1✔
320
    }
321
  }
1✔
322

323
  @GuardedBy("this")
324
  private void setRealCall(ClientCall<ReqT, RespT> realCall) {
325
    checkState(this.realCall == null, "realCall already set to %s", this.realCall);
1✔
326
    if (initialDeadlineMonitor != null) {
1✔
327
      initialDeadlineMonitor.cancel(false);
1✔
328
    }
329
    this.realCall = realCall;
1✔
330
  }
1✔
331

332
  @VisibleForTesting
333
  final ClientCall<ReqT, RespT> getRealCall() {
334
    return realCall;
×
335
  }
336

337
  @Override
338
  public final void sendMessage(final ReqT message) {
339
    if (passThrough) {
1✔
340
      realCall.sendMessage(message);
1✔
341
    } else {
342
      delayOrExecute(new Runnable() {
1✔
343
        @Override
344
        public void run() {
345
          realCall.sendMessage(message);
1✔
346
        }
1✔
347
      });
348
    }
349
  }
1✔
350

351
  @Override
352
  public final void setMessageCompression(final boolean enable) {
353
    if (passThrough) {
1✔
354
      realCall.setMessageCompression(enable);
1✔
355
    } else {
356
      delayOrExecute(new Runnable() {
1✔
357
        @Override
358
        public void run() {
359
          realCall.setMessageCompression(enable);
1✔
360
        }
1✔
361
      });
362
    }
363
  }
1✔
364

365
  @Override
366
  public final void request(final int numMessages) {
367
    if (passThrough) {
1✔
368
      realCall.request(numMessages);
1✔
369
    } else {
370
      delayOrExecute(new Runnable() {
1✔
371
        @Override
372
        public void run() {
373
          realCall.request(numMessages);
1✔
374
        }
1✔
375
      });
376
    }
377
  }
1✔
378

379
  @Override
380
  public final void halfClose() {
381
    delayOrExecute(new Runnable() {
1✔
382
      @Override
383
      public void run() {
384
        realCall.halfClose();
1✔
385
      }
1✔
386
    });
387
  }
1✔
388

389
  @Override
390
  public final boolean isReady() {
391
    if (passThrough) {
1✔
392
      return realCall.isReady();
1✔
393
    } else {
394
      return false;
×
395
    }
396
  }
397

398
  @Override
399
  public final Attributes getAttributes() {
400
    ClientCall<ReqT, RespT> savedRealCall;
401
    synchronized (this) {
1✔
402
      savedRealCall = realCall;
1✔
403
    }
1✔
404
    if (savedRealCall != null) {
1✔
405
      return savedRealCall.getAttributes();
1✔
406
    } else {
407
      return Attributes.EMPTY;
×
408
    }
409
  }
410

411
  @Override
412
  public String toString() {
413
    return MoreObjects.toStringHelper(this)
×
414
        .add("realCall", realCall)
×
415
        .toString();
×
416
  }
417

418
  private final class CloseListenerRunnable extends ContextRunnable {
419
    final Listener<RespT> listener;
420
    final Status status;
421

422
    CloseListenerRunnable(Listener<RespT> listener, Status status) {
1✔
423
      super(context);
1✔
424
      this.listener = listener;
1✔
425
      this.status = status;
1✔
426
    }
1✔
427

428
    @Override
429
    public void runInContext() {
430
      listener.onClose(status, new Metadata());
1✔
431
    }
1✔
432
  }
433

434
  private static final class DelayedListener<RespT> extends Listener<RespT> {
1✔
435
    private final Listener<RespT> realListener;
436
    private volatile boolean passThrough;
437
    @GuardedBy("this")
1✔
438
    private List<Runnable> pendingCallbacks = new ArrayList<>();
439

440
    public DelayedListener(Listener<RespT> listener) {
1✔
441
      this.realListener = listener;
1✔
442
    }
1✔
443

444
    private void delayOrExecute(Runnable runnable) {
445
      synchronized (this) {
1✔
446
        if (!passThrough) {
1✔
447
          pendingCallbacks.add(runnable);
1✔
448
          return;
1✔
449
        }
450
      }
1✔
451
      runnable.run();
1✔
452
    }
1✔
453

454
    @Override
455
    public void onHeaders(final Metadata headers) {
456
      if (passThrough) {
1✔
457
        realListener.onHeaders(headers);
1✔
458
      } else {
459
        delayOrExecute(new Runnable() {
×
460
          @Override
461
          public void run() {
462
            realListener.onHeaders(headers);
×
463
          }
×
464
        });
465
      }
466
    }
1✔
467

468
    @Override
469
    public void onMessage(final RespT message) {
470
      if (passThrough) {
1✔
471
        realListener.onMessage(message);
1✔
472
      } else {
473
        delayOrExecute(new Runnable() {
×
474
          @Override
475
          public void run() {
476
            realListener.onMessage(message);
×
477
          }
×
478
        });
479
      }
480
    }
1✔
481

482
    @Override
483
    public void onClose(final Status status, final Metadata trailers) {
484
      delayOrExecute(new Runnable() {
1✔
485
        @Override
486
        public void run() {
487
          realListener.onClose(status, trailers);
1✔
488
        }
1✔
489
      });
490
    }
1✔
491

492
    @Override
493
    public void onReady() {
494
      if (passThrough) {
1✔
495
        realListener.onReady();
1✔
496
      } else {
497
        delayOrExecute(new Runnable() {
1✔
498
          @Override
499
          public void run() {
500
            realListener.onReady();
1✔
501
          }
1✔
502
        });
503
      }
504
    }
1✔
505

506
    void drainPendingCallbacks() {
507
      assert !passThrough;
1✔
508
      List<Runnable> toRun = new ArrayList<>();
1✔
509
      while (true) {
510
        synchronized (this) {
1✔
511
          if (pendingCallbacks.isEmpty()) {
1✔
512
            pendingCallbacks = null;
1✔
513
            passThrough = true;
1✔
514
            break;
1✔
515
          }
516
          // Since there were pendingCallbacks, we need to process them. To maintain ordering we
517
          // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
518
          // added after we drop the lock. So we will have to re-check pendingCallbacks.
519
          List<Runnable> tmp = toRun;
1✔
520
          toRun = pendingCallbacks;
1✔
521
          pendingCallbacks = tmp;
1✔
522
        }
1✔
523
        for (Runnable runnable : toRun) {
1✔
524
          // Avoid calling listener while lock is held to prevent deadlocks.
525
          // TODO(ejona): exception handling
526
          runnable.run();
1✔
527
        }
1✔
528
        toRun.clear();
1✔
529
      }
530
    }
1✔
531
  }
532

533
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
534
    @Override
535
    public void start(Listener<Object> responseListener, Metadata headers) {}
1✔
536

537
    @Override
538
    public void request(int numMessages) {}
1✔
539

540
    @Override
541
    public void cancel(String message, Throwable cause) {}
1✔
542

543
    @Override
544
    public void halfClose() {}
1✔
545

546
    @Override
547
    public void sendMessage(Object message) {}
1✔
548

549
    // Always returns {@code false}, since this is only used when the startup of the call fails.
550
    @Override
551
    public boolean isReady() {
552
      return false;
×
553
    }
554
  };
555
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc