• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19449

05 Sep 2024 10:32PM CUT coverage: 84.488% (-0.01%) from 84.498%
#19449

push

github

ejona86
core: touch() buffer when detach()ing

Detachable lets a buffer outlive its original lifetime. The new lifetime
is application-controlled. If the application fails to read/close the
stream, then the leak detector wouldn't make clear what code was
responsible for the buffer's lifetime. With this touch, we'll be able to
see detach() was called and thus know the application needs debugging.

Realized when looking at b/364531464, although I think the issue is
unrelated.

33251 of 39356 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/../core/src/main/java/io/grpc/internal/DelayedClientCall.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static java.util.concurrent.TimeUnit.NANOSECONDS;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import io.grpc.Attributes;
26
import io.grpc.ClientCall;
27
import io.grpc.Context;
28
import io.grpc.Deadline;
29
import io.grpc.Metadata;
30
import io.grpc.Status;
31
import java.util.ArrayList;
32
import java.util.List;
33
import java.util.Locale;
34
import java.util.concurrent.Executor;
35
import java.util.concurrent.ScheduledExecutorService;
36
import java.util.concurrent.ScheduledFuture;
37
import java.util.concurrent.TimeUnit;
38
import java.util.logging.Level;
39
import java.util.logging.Logger;
40
import javax.annotation.Nullable;
41
import javax.annotation.concurrent.GuardedBy;
42

43
/**
44
 * A call that queues requests before a real call is ready to be delegated to.
45
 *
46
 * <p>{@code ClientCall} itself doesn't require thread-safety. However, the state of {@code
47
 * DelayedCall} may be internally altered by different threads, thus internal synchronization is
48
 * necessary.
49
 */
50
public class DelayedClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
51
  private static final Logger logger = Logger.getLogger(DelayedClientCall.class.getName());
1✔
52
  /**
53
   * A timer to monitor the initial deadline. The timer must be cancelled on transition to the real
54
   * call.
55
   */
56
  @Nullable
57
  private final ScheduledFuture<?> initialDeadlineMonitor;
58
  private final Executor callExecutor;
59
  private final Context context;
60
  /** {@code true} once realCall is valid and all pending calls have been drained. */
61
  private volatile boolean passThrough;
62
  /**
63
   * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate
64
   * order, but also used if an error occurs before {@code realCall} is set.
65
   */
66
  private Listener<RespT> listener;
67
  // Must hold {@code this} lock when setting.
68
  private ClientCall<ReqT, RespT> realCall;
69
  @GuardedBy("this")
70
  private Status error;
71
  @GuardedBy("this")
1✔
72
  private List<Runnable> pendingRunnables = new ArrayList<>();
73
  @GuardedBy("this")
74
  private DelayedListener<RespT> delayedListener;
75

76
  protected DelayedClientCall(
77
      Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
1✔
78
    this.callExecutor = checkNotNull(callExecutor, "callExecutor");
1✔
79
    checkNotNull(scheduler, "scheduler");
1✔
80
    context = Context.current();
1✔
81
    initialDeadlineMonitor = scheduleDeadlineIfNeeded(scheduler, deadline);
1✔
82
  }
1✔
83

84
  // If one argument is null, consider the other the "Before"
85
  private boolean isAbeforeB(@Nullable Deadline a, @Nullable Deadline b) {
86
    if (b == null) {
1✔
87
      return true;
×
88
    } else if (a == null) {
1✔
89
      return false;
1✔
90
    }
91

92
    return a.isBefore(b);
×
93
  }
94

95
  @Nullable
96
  private ScheduledFuture<?> scheduleDeadlineIfNeeded(
97
      ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
98
    Deadline contextDeadline = context.getDeadline();
1✔
99
    if (deadline == null && contextDeadline == null) {
1✔
100
      return null;
1✔
101
    }
102
    long remainingNanos = Long.MAX_VALUE;
1✔
103
    if (deadline != null) {
1✔
104
      remainingNanos = deadline.timeRemaining(NANOSECONDS);
1✔
105
    }
106

107
    if (contextDeadline != null && contextDeadline.timeRemaining(NANOSECONDS) < remainingNanos) {
1✔
108
      remainingNanos = contextDeadline.timeRemaining(NANOSECONDS);
×
109
      if (logger.isLoggable(Level.FINE)) {
×
110
        StringBuilder builder =
×
111
            new StringBuilder(
112
                String.format(
×
113
                    Locale.US,
114
                    "Call timeout set to '%d' ns, due to context deadline.", remainingNanos));
×
115
        if (deadline == null) {
×
116
          builder.append(" Explicit call timeout was not set.");
×
117
        } else {
118
          long callTimeout = deadline.timeRemaining(TimeUnit.NANOSECONDS);
×
119
          builder.append(String.format(
×
120
              Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
×
121
        }
122
        logger.fine(builder.toString());
×
123
      }
124
    }
125

126
    long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
1✔
127
    long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
1✔
128
    final StringBuilder buf = new StringBuilder();
1✔
129
    String deadlineName = isAbeforeB(contextDeadline, deadline) ? "Context" : "CallOptions";
1✔
130
    if (remainingNanos < 0) {
1✔
131
      buf.append("ClientCall started after ");
1✔
132
      buf.append(deadlineName);
1✔
133
      buf.append(" deadline was exceeded. Deadline has been exceeded for ");
1✔
134
    } else {
135
      buf.append("Deadline ");
1✔
136
      buf.append(deadlineName);
1✔
137
      buf.append(" will be exceeded in ");
1✔
138
    }
139
    buf.append(seconds);
1✔
140
    buf.append(String.format(Locale.US, ".%09d", nanos));
1✔
141
    buf.append("s. ");
1✔
142

143
    /* Cancels the call if deadline exceeded prior to the real call being set. */
144
    class DeadlineExceededRunnable implements Runnable {
1✔
145
      @Override
146
      public void run() {
147
        cancel(
1✔
148
            Status.DEADLINE_EXCEEDED.withDescription(buf.toString()),
1✔
149
            // We should not cancel the call if the realCall is set because there could be a
150
            // race between cancel() and realCall.start(). The realCall will handle deadline by
151
            // itself.
152
            /* onlyCancelPendingCall= */ true);
153
      }
1✔
154
    }
155

156
    return scheduler.schedule(new DeadlineExceededRunnable(), remainingNanos, NANOSECONDS);
1✔
157
  }
158

159
  /**
160
   * Transfers all pending and future requests and mutations to the given call.
161
   *
162
   * <p>No-op if either this method or {@link #cancel} have already been called.
163
   */
164
  // When this method returns, passThrough is guaranteed to be true
165
  public final Runnable setCall(ClientCall<ReqT, RespT> call) {
166
    synchronized (this) {
1✔
167
      // If realCall != null, then either setCall() or cancel() has been called.
168
      if (realCall != null) {
1✔
169
        return null;
1✔
170
      }
171
      setRealCall(checkNotNull(call, "call"));
1✔
172
    }
1✔
173
    return new ContextRunnable(context) {
1✔
174
      @Override
175
      public void runInContext() {
176
        drainPendingCalls();
1✔
177
      }
1✔
178
    };
179
  }
180

181
  @Override
182
  public final void start(Listener<RespT> listener, final Metadata headers) {
183
    checkState(this.listener == null, "already started");
1✔
184
    Status savedError;
185
    boolean savedPassThrough;
186
    synchronized (this) {
1✔
187
      this.listener = checkNotNull(listener, "listener");
1✔
188
      // If error != null, then cancel() has been called and was unable to close the listener
189
      savedError = error;
1✔
190
      savedPassThrough = passThrough;
1✔
191
      if (!savedPassThrough) {
1✔
192
        listener = delayedListener = new DelayedListener<>(listener);
1✔
193
      }
194
    }
1✔
195
    if (savedError != null) {
1✔
196
      callExecutor.execute(new CloseListenerRunnable(listener, savedError));
1✔
197
      return;
1✔
198
    }
199
    if (savedPassThrough) {
1✔
200
      realCall.start(listener, headers);
1✔
201
    } else {
202
      final Listener<RespT> finalListener = listener;
1✔
203
      delayOrExecute(new Runnable() {
1✔
204
        @Override
205
        public void run() {
206
          realCall.start(finalListener, headers);
1✔
207
        }
1✔
208
      });
209
    }
210
  }
1✔
211

212
  // When this method returns, passThrough is guaranteed to be true
213
  @Override
214
  public final void cancel(@Nullable final String message, @Nullable final Throwable cause) {
215
    Status status = Status.CANCELLED;
1✔
216
    if (message != null) {
1✔
217
      status = status.withDescription(message);
1✔
218
    } else {
219
      status = status.withDescription("Call cancelled without message");
1✔
220
    }
221
    if (cause != null) {
1✔
222
      status = status.withCause(cause);
1✔
223
    }
224
    cancel(status, false);
1✔
225
  }
1✔
226

227
  /**
228
   * Cancels the call unless {@code realCall} is set and {@code onlyCancelPendingCall} is true.
229
   */
230
  private void cancel(final Status status, boolean onlyCancelPendingCall) {
231
    boolean delegateToRealCall = true;
1✔
232
    Listener<RespT> listenerToClose = null;
1✔
233
    synchronized (this) {
1✔
234
      // If realCall != null, then either setCall() or cancel() has been called
235
      if (realCall == null) {
1✔
236
        @SuppressWarnings("unchecked")
237
        ClientCall<ReqT, RespT> noopCall = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
238
        setRealCall(noopCall);
1✔
239
        delegateToRealCall = false;
1✔
240
        // If listener == null, then start() will later call listener with 'error'
241
        listenerToClose = listener;
1✔
242
        error = status;
1✔
243
      } else if (onlyCancelPendingCall) {
1✔
244
        return;
×
245
      }
246
    }
1✔
247
    if (delegateToRealCall) {
1✔
248
      delayOrExecute(new Runnable() {
1✔
249
        @Override
250
        public void run() {
251
          realCall.cancel(status.getDescription(), status.getCause());
1✔
252
        }
1✔
253
      });
254
    } else {
255
      if (listenerToClose != null) {
1✔
256
        callExecutor.execute(new CloseListenerRunnable(listenerToClose, status));
1✔
257
      }
258
      drainPendingCalls();
1✔
259
    }
260
    callCancelled();
1✔
261
  }
1✔
262

263
  protected void callCancelled() {
264
  }
1✔
265

266
  private void delayOrExecute(Runnable runnable) {
267
    synchronized (this) {
1✔
268
      if (!passThrough) {
1✔
269
        pendingRunnables.add(runnable);
1✔
270
        return;
1✔
271
      }
272
    }
1✔
273
    runnable.run();
1✔
274
  }
1✔
275

276
  /**
277
   * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
278
   * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
279
   * should not be held when calling this method.
280
   */
281
  private void drainPendingCalls() {
282
    assert realCall != null;
1✔
283
    assert !passThrough;
1✔
284
    List<Runnable> toRun = new ArrayList<>();
1✔
285
    DelayedListener<RespT> delayedListener ;
286
    while (true) {
287
      synchronized (this) {
1✔
288
        if (pendingRunnables.isEmpty()) {
1✔
289
          pendingRunnables = null;
1✔
290
          passThrough = true;
1✔
291
          delayedListener = this.delayedListener;
1✔
292
          break;
1✔
293
        }
294
        // Since there were pendingCalls, we need to process them. To maintain ordering we can't set
295
        // passThrough=true until we run all pendingCalls, but new Runnables may be added after we
296
        // drop the lock. So we will have to re-check pendingCalls.
297
        List<Runnable> tmp = toRun;
1✔
298
        toRun = pendingRunnables;
1✔
299
        pendingRunnables = tmp;
1✔
300
      }
1✔
301
      for (Runnable runnable : toRun) {
1✔
302
        // Must not call transport while lock is held to prevent deadlocks.
303
        // TODO(ejona): exception handling
304
        runnable.run();
1✔
305
      }
1✔
306
      toRun.clear();
1✔
307
    }
308
    if (delayedListener != null) {
1✔
309
      final DelayedListener<RespT> listener = delayedListener;
1✔
310
      class DrainListenerRunnable extends ContextRunnable {
311
        DrainListenerRunnable() {
1✔
312
          super(context);
1✔
313
        }
1✔
314

315
        @Override
316
        public void runInContext() {
317
          listener.drainPendingCallbacks();
1✔
318
        }
1✔
319
      }
320

321
      callExecutor.execute(new DrainListenerRunnable());
1✔
322
    }
323
  }
1✔
324

325
  @GuardedBy("this")
326
  private void setRealCall(ClientCall<ReqT, RespT> realCall) {
327
    checkState(this.realCall == null, "realCall already set to %s", this.realCall);
1✔
328
    if (initialDeadlineMonitor != null) {
1✔
329
      initialDeadlineMonitor.cancel(false);
1✔
330
    }
331
    this.realCall = realCall;
1✔
332
  }
1✔
333

334
  @VisibleForTesting
335
  final ClientCall<ReqT, RespT> getRealCall() {
336
    return realCall;
×
337
  }
338

339
  @Override
340
  public final void sendMessage(final ReqT message) {
341
    if (passThrough) {
1✔
342
      realCall.sendMessage(message);
1✔
343
    } else {
344
      delayOrExecute(new Runnable() {
1✔
345
        @Override
346
        public void run() {
347
          realCall.sendMessage(message);
1✔
348
        }
1✔
349
      });
350
    }
351
  }
1✔
352

353
  @Override
354
  public final void setMessageCompression(final boolean enable) {
355
    if (passThrough) {
1✔
356
      realCall.setMessageCompression(enable);
1✔
357
    } else {
358
      delayOrExecute(new Runnable() {
1✔
359
        @Override
360
        public void run() {
361
          realCall.setMessageCompression(enable);
1✔
362
        }
1✔
363
      });
364
    }
365
  }
1✔
366

367
  @Override
368
  public final void request(final int numMessages) {
369
    if (passThrough) {
1✔
370
      realCall.request(numMessages);
1✔
371
    } else {
372
      delayOrExecute(new Runnable() {
1✔
373
        @Override
374
        public void run() {
375
          realCall.request(numMessages);
1✔
376
        }
1✔
377
      });
378
    }
379
  }
1✔
380

381
  @Override
382
  public final void halfClose() {
383
    delayOrExecute(new Runnable() {
1✔
384
      @Override
385
      public void run() {
386
        realCall.halfClose();
1✔
387
      }
1✔
388
    });
389
  }
1✔
390

391
  @Override
392
  public final boolean isReady() {
393
    if (passThrough) {
1✔
394
      return realCall.isReady();
1✔
395
    } else {
396
      return false;
×
397
    }
398
  }
399

400
  @Override
401
  public final Attributes getAttributes() {
402
    ClientCall<ReqT, RespT> savedRealCall;
403
    synchronized (this) {
1✔
404
      savedRealCall = realCall;
1✔
405
    }
1✔
406
    if (savedRealCall != null) {
1✔
407
      return savedRealCall.getAttributes();
1✔
408
    } else {
409
      return Attributes.EMPTY;
×
410
    }
411
  }
412

413
  @Override
414
  public String toString() {
415
    return MoreObjects.toStringHelper(this)
×
416
        .add("realCall", realCall)
×
417
        .toString();
×
418
  }
419

420
  private final class CloseListenerRunnable extends ContextRunnable {
421
    final Listener<RespT> listener;
422
    final Status status;
423

424
    CloseListenerRunnable(Listener<RespT> listener, Status status) {
1✔
425
      super(context);
1✔
426
      this.listener = listener;
1✔
427
      this.status = status;
1✔
428
    }
1✔
429

430
    @Override
431
    public void runInContext() {
432
      listener.onClose(status, new Metadata());
1✔
433
    }
1✔
434
  }
435

436
  private static final class DelayedListener<RespT> extends Listener<RespT> {
1✔
437
    private final Listener<RespT> realListener;
438
    private volatile boolean passThrough;
439
    @GuardedBy("this")
1✔
440
    private List<Runnable> pendingCallbacks = new ArrayList<>();
441

442
    public DelayedListener(Listener<RespT> listener) {
1✔
443
      this.realListener = listener;
1✔
444
    }
1✔
445

446
    private void delayOrExecute(Runnable runnable) {
447
      synchronized (this) {
1✔
448
        if (!passThrough) {
1✔
449
          pendingCallbacks.add(runnable);
1✔
450
          return;
1✔
451
        }
452
      }
1✔
453
      runnable.run();
1✔
454
    }
1✔
455

456
    @Override
457
    public void onHeaders(final Metadata headers) {
458
      if (passThrough) {
1✔
459
        realListener.onHeaders(headers);
1✔
460
      } else {
461
        delayOrExecute(new Runnable() {
×
462
          @Override
463
          public void run() {
464
            realListener.onHeaders(headers);
×
465
          }
×
466
        });
467
      }
468
    }
1✔
469

470
    @Override
471
    public void onMessage(final RespT message) {
472
      if (passThrough) {
1✔
473
        realListener.onMessage(message);
1✔
474
      } else {
475
        delayOrExecute(new Runnable() {
×
476
          @Override
477
          public void run() {
478
            realListener.onMessage(message);
×
479
          }
×
480
        });
481
      }
482
    }
1✔
483

484
    @Override
485
    public void onClose(final Status status, final Metadata trailers) {
486
      delayOrExecute(new Runnable() {
1✔
487
        @Override
488
        public void run() {
489
          realListener.onClose(status, trailers);
1✔
490
        }
1✔
491
      });
492
    }
1✔
493

494
    @Override
495
    public void onReady() {
496
      if (passThrough) {
1✔
497
        realListener.onReady();
1✔
498
      } else {
499
        delayOrExecute(new Runnable() {
1✔
500
          @Override
501
          public void run() {
502
            realListener.onReady();
1✔
503
          }
1✔
504
        });
505
      }
506
    }
1✔
507

508
    void drainPendingCallbacks() {
509
      assert !passThrough;
1✔
510
      List<Runnable> toRun = new ArrayList<>();
1✔
511
      while (true) {
512
        synchronized (this) {
1✔
513
          if (pendingCallbacks.isEmpty()) {
1✔
514
            pendingCallbacks = null;
1✔
515
            passThrough = true;
1✔
516
            break;
1✔
517
          }
518
          // Since there were pendingCallbacks, we need to process them. To maintain ordering we
519
          // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
520
          // added after we drop the lock. So we will have to re-check pendingCallbacks.
521
          List<Runnable> tmp = toRun;
1✔
522
          toRun = pendingCallbacks;
1✔
523
          pendingCallbacks = tmp;
1✔
524
        }
1✔
525
        for (Runnable runnable : toRun) {
1✔
526
          // Avoid calling listener while lock is held to prevent deadlocks.
527
          // TODO(ejona): exception handling
528
          runnable.run();
1✔
529
        }
1✔
530
        toRun.clear();
1✔
531
      }
532
    }
1✔
533
  }
534

535
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
536
    @Override
537
    public void start(Listener<Object> responseListener, Metadata headers) {}
1✔
538

539
    @Override
540
    public void request(int numMessages) {}
1✔
541

542
    @Override
543
    public void cancel(String message, Throwable cause) {}
1✔
544

545
    @Override
546
    public void halfClose() {}
1✔
547

548
    @Override
549
    public void sendMessage(Object message) {}
1✔
550

551
    // Always returns {@code false}, since this is only used when the startup of the call fails.
552
    @Override
553
    public boolean isReady() {
554
      return false;
×
555
    }
556
  };
557
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc