• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19680

05 Feb 2025 10:18PM CUT coverage: 88.617% (+0.03%) from 88.584%
#19680

push

github

web-flow
core: updates the backoff range as per the A6 redefinition (#11858)

* core: updates the backoff range being used from [0, 1] to [0.8, 1.2] as per the A6 redefinition

* adds a flag for experimental jitter

* xds: Allow FaultFilter's interceptor to be reused

This is the only usage of PickSubchannelArgs when creating a filter's
ClientInterceptor, and a follow-up commit will remove the argument and
actually reuse the interceptors. Other filter's interceptors can
already be reused.

There doesn't seem to be any significant loss of legibility by making
FaultFilter a more ordinary interceptor, but the change does cause the
ForwardingClientCall to be present when faultDelay is configured,
independent of whether the fault delay ends up being triggered.

Reusing interceptors will move more state management out of the RPC path
which will be more relevant with RLQS.

* netty: Removed 4096 min buffer size (#11856)

* netty: Removed 4096 min buffer size

* turns the flag in a var for better efficiency

---------

Co-authored-by: Eric Anderson <ejona@google.com>

33779 of 38118 relevant lines covered (88.62%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.5
/../core/src/main/java/io/grpc/internal/RetriableStream.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Objects;
25
import com.google.errorprone.annotations.CheckReturnValue;
26
import com.google.errorprone.annotations.concurrent.GuardedBy;
27
import io.grpc.Attributes;
28
import io.grpc.ClientStreamTracer;
29
import io.grpc.Compressor;
30
import io.grpc.Deadline;
31
import io.grpc.DecompressorRegistry;
32
import io.grpc.Metadata;
33
import io.grpc.MethodDescriptor;
34
import io.grpc.Status;
35
import io.grpc.SynchronizationContext;
36
import io.grpc.internal.ClientStreamListener.RpcProgress;
37
import java.io.InputStream;
38
import java.lang.Thread.UncaughtExceptionHandler;
39
import java.util.ArrayList;
40
import java.util.Collection;
41
import java.util.Collections;
42
import java.util.List;
43
import java.util.Random;
44
import java.util.concurrent.Executor;
45
import java.util.concurrent.Future;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import java.util.concurrent.atomic.AtomicBoolean;
49
import java.util.concurrent.atomic.AtomicInteger;
50
import java.util.concurrent.atomic.AtomicLong;
51
import javax.annotation.CheckForNull;
52
import javax.annotation.Nullable;
53

54
/** A logical {@link ClientStream} that is retriable. */
55
abstract class RetriableStream<ReqT> implements ClientStream {
56
  @VisibleForTesting
57
  static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
1✔
58
      Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
1✔
59

60
  @VisibleForTesting
61
  static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
1✔
62
      Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
1✔
63

64
  private static final Status CANCELLED_BECAUSE_COMMITTED =
1✔
65
      Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
1✔
66

67
  private final MethodDescriptor<ReqT, ?> method;
68
  private final Executor callExecutor;
69
  private final Executor listenerSerializeExecutor = new SynchronizationContext(
1✔
70
      new UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          throw Status.fromThrowable(e)
1✔
74
              .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
1✔
75
              .asRuntimeException();
1✔
76
        }
77
      }
78
  );
79
  private final ScheduledExecutorService scheduledExecutorService;
80
  // Must not modify it.
81
  private final Metadata headers;
82
  @Nullable
83
  private final RetryPolicy retryPolicy;
84
  @Nullable
85
  private final HedgingPolicy hedgingPolicy;
86
  private final boolean isHedging;
87

88
  /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
89
  private final Object lock = new Object();
1✔
90

91
  private final ChannelBufferMeter channelBufferUsed;
92
  private final long perRpcBufferLimit;
93
  private final long channelBufferLimit;
94
  @Nullable
95
  private final Throttle throttle;
96
  @GuardedBy("lock")
1✔
97
  private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
98

99
  private volatile State state = new State(
1✔
100
      new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
1✔
101
      false, 0);
102

103
  /**
104
   * Either non-local transparent retry happened or reached server's application logic.
105
   *
106
   * <p>Note that local-only transparent retries are unlimited.
107
   */
108
  private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
1✔
109
  private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
1✔
110
  private final AtomicInteger inFlightSubStreams = new AtomicInteger();
1✔
111
  private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112

113
  // Used for recording the share of buffer used for the current call out of the channel buffer.
114
  // This field would not be necessary if there is no channel buffer limit.
115
  @GuardedBy("lock")
116
  private long perRpcBufferUsed;
117

118
  private ClientStreamListener masterListener;
119
  @GuardedBy("lock")
120
  private FutureCanceller scheduledRetry;
121
  @GuardedBy("lock")
122
  private FutureCanceller scheduledHedging;
123
  private long nextBackoffIntervalNanos;
124
  private Status cancellationStatus;
125
  private boolean isClosed;
126

127
  RetriableStream(
128
      MethodDescriptor<ReqT, ?> method, Metadata headers,
129
      ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
130
      Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
131
      @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy,
132
      @Nullable Throttle throttle) {
1✔
133
    this.method = method;
1✔
134
    this.channelBufferUsed = channelBufferUsed;
1✔
135
    this.perRpcBufferLimit = perRpcBufferLimit;
1✔
136
    this.channelBufferLimit = channelBufferLimit;
1✔
137
    this.callExecutor = callExecutor;
1✔
138
    this.scheduledExecutorService = scheduledExecutorService;
1✔
139
    this.headers = headers;
1✔
140
    this.retryPolicy = retryPolicy;
1✔
141
    if (retryPolicy != null) {
1✔
142
      this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
143
    }
144
    this.hedgingPolicy = hedgingPolicy;
1✔
145
    checkArgument(
1✔
146
        retryPolicy == null || hedgingPolicy == null,
147
        "Should not provide both retryPolicy and hedgingPolicy");
148
    this.isHedging = hedgingPolicy != null;
1✔
149
    this.throttle = throttle;
1✔
150
  }
1✔
151

152
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153
  @Nullable // null if already committed
154
  @CheckReturnValue
155
  private Runnable commit(final Substream winningSubstream) {
156
    synchronized (lock) {
1✔
157
      if (state.winningSubstream != null) {
1✔
158
        return null;
1✔
159
      }
160
      final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
1✔
161

162
      state = state.committed(winningSubstream);
1✔
163

164
      // subtract the share of this RPC from channelBufferUsed.
165
      channelBufferUsed.addAndGet(-perRpcBufferUsed);
1✔
166

167
      final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
1✔
168
      final Future<?> retryFuture;
169
      if (scheduledRetry != null) {
1✔
170
        retryFuture = scheduledRetry.markCancelled();
1✔
171
        scheduledRetry = null;
1✔
172
      } else {
173
        retryFuture = null;
1✔
174
      }
175
      // cancel the scheduled hedging if it is scheduled prior to the commitment
176
      final Future<?> hedgingFuture;
177
      if (scheduledHedging != null) {
1✔
178
        hedgingFuture = scheduledHedging.markCancelled();
1✔
179
        scheduledHedging = null;
1✔
180
      } else {
181
        hedgingFuture = null;
1✔
182
      }
183

184
      class CommitTask implements Runnable {
1✔
185
        @Override
186
        public void run() {
187
          // For hedging only, not needed for normal retry
188
          for (Substream substream : savedDrainedSubstreams) {
1✔
189
            if (substream != winningSubstream) {
1✔
190
              substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
1✔
191
            }
192
          }
1✔
193
          if (retryFuture != null) {
1✔
194
            retryFuture.cancel(false);
1✔
195
            if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
196
              assert savedCloseMasterListenerReason != null;
×
197
              listenerSerializeExecutor.execute(
×
198
                  new Runnable() {
×
199
                    @Override
200
                    public void run() {
201
                      isClosed = true;
×
202
                      masterListener.closed(savedCloseMasterListenerReason.status,
×
203
                          savedCloseMasterListenerReason.progress,
×
204
                          savedCloseMasterListenerReason.metadata);
×
205
                    }
×
206
                  });
207
            }
208
          }
209

210
          if (hedgingFuture != null) {
1✔
211
            hedgingFuture.cancel(false);
1✔
212
          }
213

214
          postCommit();
1✔
215
        }
1✔
216
      }
217

218
      return new CommitTask();
1✔
219
    }
220
  }
221

222
  abstract void postCommit();
223

224
  /**
225
   * Calls commit() and if successful runs the post commit task. Post commit task will be non-null
226
   * for only once. The post commit task cancels other non-winning streams on separate transport
227
   * threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream
228
   * transports.(issues/10314)
229
   * This method should be called only in subListener callbacks. This guarantees callExecutor
230
   * schedules tasks before master listener closes, which is protected by the inFlightSubStreams
231
   * decorative. That is because:
232
   * For a successful winning stream, other streams won't attempt to close master listener.
233
   * For a cancelled winning stream (noop), other stream won't attempt to close master listener.
234
   * For a failed/closed winning stream, the last closed stream closes the master listener, and
235
   * callExecutor scheduling happens-before that.
236
   */
237
  private void commitAndRun(Substream winningSubstream) {
238
    Runnable postCommitTask = commit(winningSubstream);
1✔
239

240
    if (postCommitTask != null) {
1✔
241
      callExecutor.execute(postCommitTask);
1✔
242
    }
243
  }
1✔
244

245
  // returns null means we should not create new sub streams, e.g. cancelled or
246
  // other close condition is met for retriableStream.
247
  @Nullable
248
  private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
249
    int inFlight;
250
    do {
251
      inFlight = inFlightSubStreams.get();
1✔
252
      if (inFlight < 0) {
1✔
253
        return null;
×
254
      }
255
    } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1));
1✔
256
    Substream sub = new Substream(previousAttemptCount);
1✔
257
    // one tracer per substream
258
    final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
1✔
259
    ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
1✔
260
      @Override
261
      public ClientStreamTracer newClientStreamTracer(
262
          ClientStreamTracer.StreamInfo info, Metadata headers) {
263
        return bufferSizeTracer;
1✔
264
      }
265
    };
266

267
    Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
1✔
268
    // NOTICE: This set _must_ be done before stream.start() and it actually is.
269
    sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry);
1✔
270
    return sub;
1✔
271
  }
272

273
  /**
274
   * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
275
   * Client stream is not yet started.
276
   */
277
  abstract ClientStream newSubstream(
278
      Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
279
      boolean isTransparentRetry);
280

281
  /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
282
  @VisibleForTesting
283
  final Metadata updateHeaders(
284
      Metadata originalHeaders, int previousAttemptCount) {
285
    Metadata newHeaders = new Metadata();
1✔
286
    newHeaders.merge(originalHeaders);
1✔
287
    if (previousAttemptCount > 0) {
1✔
288
      newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount));
1✔
289
    }
290
    return newHeaders;
1✔
291
  }
292

293
  private void drain(Substream substream) {
294
    int index = 0;
1✔
295
    int chunk = 0x80;
1✔
296
    List<BufferEntry> list = null;
1✔
297
    boolean streamStarted = false;
1✔
298
    Runnable onReadyRunnable = null;
1✔
299

300
    while (true) {
301
      State savedState;
302

303
      synchronized (lock) {
1✔
304
        savedState = state;
1✔
305
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
306
          // committed but not me, to be cancelled
307
          break;
1✔
308
        }
309
        if (savedState.cancelled) {
1✔
310
          break;
1✔
311
        }
312
        if (index == savedState.buffer.size()) { // I'm drained
1✔
313
          state = savedState.substreamDrained(substream);
1✔
314
          if (!isReady()) {
1✔
315
            return;
1✔
316
          }
317
          onReadyRunnable = new Runnable() {
1✔
318
            @Override
319
            public void run() {
320
              if (!isClosed) {
1✔
321
                masterListener.onReady();
1✔
322
              }
323
            }
1✔
324
          };
325
          break;
1✔
326
        }
327

328
        if (substream.closed) {
1✔
329
          return;
×
330
        }
331

332
        int stop = Math.min(index + chunk, savedState.buffer.size());
1✔
333
        if (list == null) {
1✔
334
          list = new ArrayList<>(savedState.buffer.subList(index, stop));
1✔
335
        } else {
336
          list.clear();
1✔
337
          list.addAll(savedState.buffer.subList(index, stop));
1✔
338
        }
339
        index = stop;
1✔
340
      }
1✔
341

342
      for (BufferEntry bufferEntry : list) {
1✔
343
        bufferEntry.runWith(substream);
1✔
344
        if (bufferEntry instanceof RetriableStream.StartEntry) {
1✔
345
          streamStarted = true;
1✔
346
        }
347
        savedState = state;
1✔
348
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
349
          // committed but not me, to be cancelled
350
          break;
1✔
351
        }
352
        if (savedState.cancelled) {
1✔
353
          break;
1✔
354
        }
355
      }
1✔
356
    }
1✔
357

358
    if (onReadyRunnable != null) {
1✔
359
      listenerSerializeExecutor.execute(onReadyRunnable);
1✔
360
      return;
1✔
361
    }
362

363
    if (!streamStarted) {
1✔
364
      // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
365
      substream.stream.start(new Sublistener(substream));
1✔
366
    }
367
    substream.stream.cancel(
1✔
368
        state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
1✔
369
  }
1✔
370

371
  /**
372
   * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
373
   */
374
  @CheckReturnValue
375
  @Nullable
376
  abstract Status prestart();
377

378
  class StartEntry implements BufferEntry {
1✔
379
    @Override
380
    public void runWith(Substream substream) {
381
      substream.stream.start(new Sublistener(substream));
1✔
382
    }
1✔
383
  }
384

385
  /** Starts the first PRC attempt. */
386
  @Override
387
  public final void start(ClientStreamListener listener) {
388
    masterListener = listener;
1✔
389

390
    Status shutdownStatus = prestart();
1✔
391

392
    if (shutdownStatus != null) {
1✔
393
      cancel(shutdownStatus);
1✔
394
      return;
1✔
395
    }
396

397
    synchronized (lock) {
1✔
398
      state.buffer.add(new StartEntry());
1✔
399
    }
1✔
400

401
    Substream substream = createSubstream(0, false);
1✔
402
    if (substream == null) {
1✔
403
      return;
×
404
    }
405
    if (isHedging) {
1✔
406
      FutureCanceller scheduledHedgingRef = null;
1✔
407

408
      synchronized (lock) {
1✔
409
        state = state.addActiveHedge(substream);
1✔
410
        if (hasPotentialHedging(state)
1✔
411
            && (throttle == null || throttle.isAboveThreshold())) {
1✔
412
          scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
1✔
413
        }
414
      }
1✔
415

416
      if (scheduledHedgingRef != null) {
1✔
417
        scheduledHedgingRef.setFuture(
1✔
418
            scheduledExecutorService.schedule(
1✔
419
                new HedgingRunnable(scheduledHedgingRef),
420
                hedgingPolicy.hedgingDelayNanos,
421
                TimeUnit.NANOSECONDS));
422
      }
423
    }
424

425
    drain(substream);
1✔
426
  }
1✔
427

428
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
429
  private void pushbackHedging(@Nullable Integer delayMillis) {
430
    if (delayMillis == null) {
1✔
431
      return;
1✔
432
    }
433
    if (delayMillis < 0) {
1✔
434
      freezeHedging();
1✔
435
      return;
1✔
436
    }
437

438
    // Cancels the current scheduledHedging and reschedules a new one.
439
    FutureCanceller future;
440
    Future<?> futureToBeCancelled;
441

442
    synchronized (lock) {
1✔
443
      if (scheduledHedging == null) {
1✔
444
        return;
×
445
      }
446

447
      futureToBeCancelled = scheduledHedging.markCancelled();
1✔
448
      scheduledHedging = future = new FutureCanceller(lock);
1✔
449
    }
1✔
450

451
    if (futureToBeCancelled != null) {
1✔
452
      futureToBeCancelled.cancel(false);
1✔
453
    }
454
    future.setFuture(scheduledExecutorService.schedule(
1✔
455
        new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS));
1✔
456
  }
1✔
457

458
  private final class HedgingRunnable implements Runnable {
459

460
    // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
461
    // by a positive push-back just after newSubstream is instantiated, so that we can double check.
462
    final FutureCanceller scheduledHedgingRef;
463

464
    HedgingRunnable(FutureCanceller scheduledHedging) {
1✔
465
      scheduledHedgingRef = scheduledHedging;
1✔
466
    }
1✔
467

468
    @Override
469
    public void run() {
470
      // It's safe to read state.hedgingAttemptCount here.
471
      // If this run is not cancelled, the value of state.hedgingAttemptCount won't change
472
      // until state.addActiveHedge() is called subsequently, even the state could possibly
473
      // change.
474
      Substream newSubstream = createSubstream(state.hedgingAttemptCount, false);
1✔
475
      if (newSubstream == null) {
1✔
476
        return;
×
477
      }
478
      callExecutor.execute(
1✔
479
          new Runnable() {
1✔
480
            @SuppressWarnings("GuardedBy")  //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
481
            @Override
482
            public void run() {
483
              boolean cancelled = false;
1✔
484
              FutureCanceller future = null;
1✔
485

486
              synchronized (lock) {
1✔
487
                if (scheduledHedgingRef.isCancelled()) {
1✔
488
                  cancelled = true;
×
489
                } else {
490
                  state = state.addActiveHedge(newSubstream);
1✔
491
                  if (hasPotentialHedging(state)
1✔
492
                      && (throttle == null || throttle.isAboveThreshold())) {
1✔
493
                    scheduledHedging = future = new FutureCanceller(lock);
1✔
494
                  } else {
495
                    state = state.freezeHedging();
1✔
496
                    scheduledHedging = null;
1✔
497
                  }
498
                }
499
              }
1✔
500

501
              if (cancelled) {
1✔
502
                // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
503
                newSubstream.stream.start(new Sublistener(newSubstream));
×
504
                newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
×
505
                return;
×
506
              }
507
              if (future != null) {
1✔
508
                future.setFuture(
1✔
509
                    scheduledExecutorService.schedule(
1✔
510
                        new HedgingRunnable(future),
511
                        hedgingPolicy.hedgingDelayNanos,
1✔
512
                        TimeUnit.NANOSECONDS));
513
              }
514
              drain(newSubstream);
1✔
515
            }
1✔
516
          });
517
    }
1✔
518
  }
519

520
  @Override
521
  public final void cancel(final Status reason) {
522
    Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
1✔
523
    noopSubstream.stream = new NoopClientStream();
1✔
524
    Runnable runnable = commit(noopSubstream);
1✔
525

526
    if (runnable != null) {
1✔
527
      synchronized (lock) {
1✔
528
        state = state.substreamDrained(noopSubstream);
1✔
529
      }
1✔
530
      runnable.run();
1✔
531
      safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
1✔
532
      return;
1✔
533
    }
534

535
    Substream winningSubstreamToCancel = null;
1✔
536
    synchronized (lock) {
1✔
537
      if (state.drainedSubstreams.contains(state.winningSubstream)) {
1✔
538
        winningSubstreamToCancel = state.winningSubstream;
1✔
539
      } else { // the winningSubstream will be cancelled while draining
540
        cancellationStatus = reason;
1✔
541
      }
542
      state = state.cancelled();
1✔
543
    }
1✔
544
    if (winningSubstreamToCancel != null) {
1✔
545
      winningSubstreamToCancel.stream.cancel(reason);
1✔
546
    }
547
  }
1✔
548

549
  private void delayOrExecute(BufferEntry bufferEntry) {
550
    Collection<Substream> savedDrainedSubstreams;
551
    synchronized (lock) {
1✔
552
      if (!state.passThrough) {
1✔
553
        state.buffer.add(bufferEntry);
1✔
554
      }
555
      savedDrainedSubstreams = state.drainedSubstreams;
1✔
556
    }
1✔
557

558
    for (Substream substream : savedDrainedSubstreams) {
1✔
559
      bufferEntry.runWith(substream);
1✔
560
    }
1✔
561
  }
1✔
562

563
  /**
564
   * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use
565
   * InputStream for buffering.
566
   */
567
  @Override
568
  public final void writeMessage(InputStream message) {
569
    throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
×
570
  }
571

572
  final void sendMessage(final ReqT message) {
573
    State savedState = state;
1✔
574
    if (savedState.passThrough) {
1✔
575
      savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
1✔
576
      return;
1✔
577
    }
578

579
    class SendMessageEntry implements BufferEntry {
1✔
580
      @Override
581
      public void runWith(Substream substream) {
582
        substream.stream.writeMessage(method.streamRequest(message));
1✔
583
        // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
584
        // flushes (or half close), but retry appears to have a code path that the flushes may
585
        // not happen. The code needs to be fixed and this removed. See #9340.
586
        substream.stream.flush();
1✔
587
      }
1✔
588
    }
589

590
    delayOrExecute(new SendMessageEntry());
1✔
591
  }
1✔
592

593
  @Override
594
  public final void request(final int numMessages) {
595
    State savedState = state;
1✔
596
    if (savedState.passThrough) {
1✔
597
      savedState.winningSubstream.stream.request(numMessages);
1✔
598
      return;
1✔
599
    }
600

601
    class RequestEntry implements BufferEntry {
1✔
602
      @Override
603
      public void runWith(Substream substream) {
604
        substream.stream.request(numMessages);
1✔
605
      }
1✔
606
    }
607

608
    delayOrExecute(new RequestEntry());
1✔
609
  }
1✔
610

611
  @Override
612
  public final void flush() {
613
    State savedState = state;
1✔
614
    if (savedState.passThrough) {
1✔
615
      savedState.winningSubstream.stream.flush();
1✔
616
      return;
1✔
617
    }
618

619
    class FlushEntry implements BufferEntry {
1✔
620
      @Override
621
      public void runWith(Substream substream) {
622
        substream.stream.flush();
1✔
623
      }
1✔
624
    }
625

626
    delayOrExecute(new FlushEntry());
1✔
627
  }
1✔
628

629
  @Override
630
  public final boolean isReady() {
631
    for (Substream substream : state.drainedSubstreams) {
1✔
632
      if (substream.stream.isReady()) {
1✔
633
        return true;
1✔
634
      }
635
    }
1✔
636
    return false;
1✔
637
  }
638

639
  @Override
640
  public void optimizeForDirectExecutor() {
641
    class OptimizeDirectEntry implements BufferEntry {
1✔
642
      @Override
643
      public void runWith(Substream substream) {
644
        substream.stream.optimizeForDirectExecutor();
1✔
645
      }
1✔
646
    }
647

648
    delayOrExecute(new OptimizeDirectEntry());
1✔
649
  }
1✔
650

651
  @Override
652
  public final void setCompressor(final Compressor compressor) {
653
    class CompressorEntry implements BufferEntry {
1✔
654
      @Override
655
      public void runWith(Substream substream) {
656
        substream.stream.setCompressor(compressor);
1✔
657
      }
1✔
658
    }
659

660
    delayOrExecute(new CompressorEntry());
1✔
661
  }
1✔
662

663
  @Override
664
  public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
665
    class FullStreamDecompressionEntry implements BufferEntry {
1✔
666
      @Override
667
      public void runWith(Substream substream) {
668
        substream.stream.setFullStreamDecompression(fullStreamDecompression);
1✔
669
      }
1✔
670
    }
671

672
    delayOrExecute(new FullStreamDecompressionEntry());
1✔
673
  }
1✔
674

675
  @Override
676
  public final void setMessageCompression(final boolean enable) {
677
    class MessageCompressionEntry implements BufferEntry {
1✔
678
      @Override
679
      public void runWith(Substream substream) {
680
        substream.stream.setMessageCompression(enable);
1✔
681
      }
1✔
682
    }
683

684
    delayOrExecute(new MessageCompressionEntry());
1✔
685
  }
1✔
686

687
  @Override
688
  public final void halfClose() {
689
    class HalfCloseEntry implements BufferEntry {
1✔
690
      @Override
691
      public void runWith(Substream substream) {
692
        substream.stream.halfClose();
1✔
693
      }
1✔
694
    }
695

696
    delayOrExecute(new HalfCloseEntry());
1✔
697
  }
1✔
698

699
  @Override
700
  public final void setAuthority(final String authority) {
701
    class AuthorityEntry implements BufferEntry {
1✔
702
      @Override
703
      public void runWith(Substream substream) {
704
        substream.stream.setAuthority(authority);
1✔
705
      }
1✔
706
    }
707

708
    delayOrExecute(new AuthorityEntry());
1✔
709
  }
1✔
710

711
  @Override
712
  public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
713
    class DecompressorRegistryEntry implements BufferEntry {
1✔
714
      @Override
715
      public void runWith(Substream substream) {
716
        substream.stream.setDecompressorRegistry(decompressorRegistry);
1✔
717
      }
1✔
718
    }
719

720
    delayOrExecute(new DecompressorRegistryEntry());
1✔
721
  }
1✔
722

723
  @Override
724
  public final void setMaxInboundMessageSize(final int maxSize) {
725
    class MaxInboundMessageSizeEntry implements BufferEntry {
1✔
726
      @Override
727
      public void runWith(Substream substream) {
728
        substream.stream.setMaxInboundMessageSize(maxSize);
1✔
729
      }
1✔
730
    }
731

732
    delayOrExecute(new MaxInboundMessageSizeEntry());
1✔
733
  }
1✔
734

735
  @Override
736
  public final void setMaxOutboundMessageSize(final int maxSize) {
737
    class MaxOutboundMessageSizeEntry implements BufferEntry {
1✔
738
      @Override
739
      public void runWith(Substream substream) {
740
        substream.stream.setMaxOutboundMessageSize(maxSize);
1✔
741
      }
1✔
742
    }
743

744
    delayOrExecute(new MaxOutboundMessageSizeEntry());
1✔
745
  }
1✔
746

747
  @Override
748
  public final void setDeadline(final Deadline deadline) {
749
    class DeadlineEntry implements BufferEntry {
1✔
750
      @Override
751
      public void runWith(Substream substream) {
752
        substream.stream.setDeadline(deadline);
1✔
753
      }
1✔
754
    }
755

756
    delayOrExecute(new DeadlineEntry());
1✔
757
  }
1✔
758

759
  @Override
760
  public final Attributes getAttributes() {
761
    if (state.winningSubstream != null) {
1✔
762
      return state.winningSubstream.stream.getAttributes();
1✔
763
    }
764
    return Attributes.EMPTY;
×
765
  }
766

767
  @Override
768
  public void appendTimeoutInsight(InsightBuilder insight) {
769
    State currentState;
770
    synchronized (lock) {
1✔
771
      insight.appendKeyValue("closed", closedSubstreamsInsight);
1✔
772
      currentState = state;
1✔
773
    }
1✔
774
    if (currentState.winningSubstream != null) {
1✔
775
      // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
776
      // of the winning substream, they may not have received closed() notifications yet, thus they
777
      // may be missing from closedSubstreamsInsight.  This may be a little confusing to the user.
778
      // We need to figure out how to include them.
779
      InsightBuilder substreamInsight = new InsightBuilder();
1✔
780
      currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
1✔
781
      insight.appendKeyValue("committed", substreamInsight);
1✔
782
    } else {
1✔
783
      InsightBuilder openSubstreamsInsight = new InsightBuilder();
1✔
784
      // drainedSubstreams doesn't include all open substreams.  Those which have just been created
785
      // and are still catching up with buffered requests (in other words, still draining) will not
786
      // show up.  We think this is benign, because the draining should be typically fast, and it'd
787
      // be indistinguishable from the case where those streams are to be created a little late due
788
      // to delays in the timer.
789
      for (Substream sub : currentState.drainedSubstreams) {
1✔
790
        InsightBuilder substreamInsight = new InsightBuilder();
1✔
791
        sub.stream.appendTimeoutInsight(substreamInsight);
1✔
792
        openSubstreamsInsight.append(substreamInsight);
1✔
793
      }
1✔
794
      insight.appendKeyValue("open", openSubstreamsInsight);
1✔
795
    }
796
  }
1✔
797

798
  private static Random random = new Random();
1✔
799

800
  @VisibleForTesting
801
  static void setRandom(Random random) {
802
    RetriableStream.random = random;
1✔
803
  }
1✔
804

805
  /**
806
   * Whether there is any potential hedge at the moment. A false return value implies there is
807
   * absolutely no potential hedge. At least one of the hedges will observe a false return value
808
   * when calling this method, unless otherwise the rpc is committed.
809
   */
810
  // only called when isHedging is true
811
  @GuardedBy("lock")
812
  private boolean hasPotentialHedging(State state) {
813
    return state.winningSubstream == null
1✔
814
        && state.hedgingAttemptCount < hedgingPolicy.maxAttempts
815
        && !state.hedgingFrozen;
816
  }
817

818
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
819
  private void freezeHedging() {
820
    Future<?> futureToBeCancelled = null;
1✔
821
    synchronized (lock) {
1✔
822
      if (scheduledHedging != null) {
1✔
823
        futureToBeCancelled = scheduledHedging.markCancelled();
1✔
824
        scheduledHedging = null;
1✔
825
      }
826
      state = state.freezeHedging();
1✔
827
    }
1✔
828

829
    if (futureToBeCancelled != null) {
1✔
830
      futureToBeCancelled.cancel(false);
1✔
831
    }
832
  }
1✔
833

834
  private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
835
    savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
1✔
836
        metadata);
837
    if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
1✔
838
      listenerSerializeExecutor.execute(
1✔
839
          new Runnable() {
1✔
840
            @Override
841
            public void run() {
842
              isClosed = true;
1✔
843
              masterListener.closed(status, progress, metadata);
1✔
844
            }
1✔
845
          });
846
    }
847
  }
1✔
848

849
  private static final boolean isExperimentalRetryJitterEnabled = GrpcUtil
1✔
850
          .getFlag("GRPC_EXPERIMENTAL_XDS_RLS_LB", true);
1✔
851

852
  public static long intervalWithJitter(long intervalNanos) {
853
    double inverseJitterFactor = isExperimentalRetryJitterEnabled
1✔
854
            ? 0.8 * random.nextDouble() + 0.4 : random.nextDouble();
1✔
855
    return (long) (intervalNanos * inverseJitterFactor);
1✔
856
  }
857

858
  private static final class SavedCloseMasterListenerReason {
859
    private final Status status;
860
    private final RpcProgress progress;
861
    private final Metadata metadata;
862

863
    SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
1✔
864
      this.status = status;
1✔
865
      this.progress = progress;
1✔
866
      this.metadata = metadata;
1✔
867
    }
1✔
868
  }
869

870
  private interface BufferEntry {
871
    /** Replays the buffer entry with the given stream. */
872
    void runWith(Substream substream);
873
  }
874

875
  private final class Sublistener implements ClientStreamListener {
1✔
876
    final Substream substream;
877

878
    Sublistener(Substream substream) {
1✔
879
      this.substream = substream;
1✔
880
    }
1✔
881

882
    @Override
883
    public void headersRead(final Metadata headers) {
884
      if (substream.previousAttemptCount > 0) {
1✔
885
        headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
1✔
886
        headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
1✔
887
      }
888
      commitAndRun(substream);
1✔
889
      if (state.winningSubstream == substream) {
1✔
890
        if (throttle != null) {
1✔
891
          throttle.onSuccess();
1✔
892
        }
893
        listenerSerializeExecutor.execute(
1✔
894
            new Runnable() {
1✔
895
              @Override
896
              public void run() {
897
                masterListener.headersRead(headers);
1✔
898
              }
1✔
899
            });
900
      }
901
    }
1✔
902

903
    @Override
904
    public void closed(
905
        final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
906
      synchronized (lock) {
1✔
907
        state = state.substreamClosed(substream);
1✔
908
        closedSubstreamsInsight.append(status.getCode());
1✔
909
      }
1✔
910

911
      if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
912
        assert savedCloseMasterListenerReason != null;
1✔
913
        listenerSerializeExecutor.execute(
1✔
914
            new Runnable() {
1✔
915
              @Override
916
              public void run() {
917
                isClosed = true;
1✔
918
                masterListener.closed(savedCloseMasterListenerReason.status,
1✔
919
                    savedCloseMasterListenerReason.progress,
1✔
920
                    savedCloseMasterListenerReason.metadata);
1✔
921
              }
1✔
922
            });
923
        return;
1✔
924
      }
925

926
      // handle a race between buffer limit exceeded and closed, when setting
927
      // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
928
      if (substream.bufferLimitExceeded) {
1✔
929
        commitAndRun(substream);
1✔
930
        if (state.winningSubstream == substream) {
1✔
931
          safeCloseMasterListener(status, rpcProgress, trailers);
1✔
932
        }
933
        return;
1✔
934
      }
935
      if (rpcProgress == RpcProgress.MISCARRIED
1✔
936
          && localOnlyTransparentRetries.incrementAndGet() > 1_000) {
1✔
937
        commitAndRun(substream);
1✔
938
        if (state.winningSubstream == substream) {
1✔
939
          Status tooManyTransparentRetries = Status.INTERNAL
1✔
940
              .withDescription("Too many transparent retries. Might be a bug in gRPC")
1✔
941
              .withCause(status.asRuntimeException());
1✔
942
          safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
1✔
943
        }
944
        return;
1✔
945
      }
946

947
      if (state.winningSubstream == null) {
1✔
948
        if (rpcProgress == RpcProgress.MISCARRIED
1✔
949
            || (rpcProgress == RpcProgress.REFUSED
950
                && noMoreTransparentRetry.compareAndSet(false, true))) {
1✔
951
          // transparent retry
952
          final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
1✔
953
          if (newSubstream == null) {
1✔
954
            return;
×
955
          }
956
          if (isHedging) {
1✔
957
            synchronized (lock) {
1✔
958
              // Although this operation is not done atomically with
959
              // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of
960
              // activeHedges, so neither does it affect the commitment decision of other threads,
961
              // nor do the commitment decision making threads affect itself.
962
              state = state.replaceActiveHedge(substream, newSubstream);
1✔
963
            }
1✔
964
          }
965

966
          callExecutor.execute(new Runnable() {
1✔
967
            @Override
968
            public void run() {
969
              drain(newSubstream);
1✔
970
            }
1✔
971
          });
972
          return;
1✔
973
        } else if (rpcProgress == RpcProgress.DROPPED) {
1✔
974
          // For normal retry, nothing need be done here, will just commit.
975
          // For hedging, cancel scheduled hedge that is scheduled prior to the drop
976
          if (isHedging) {
1✔
977
            freezeHedging();
×
978
          }
979
        } else {
980
          noMoreTransparentRetry.set(true);
1✔
981

982
          if (isHedging) {
1✔
983
            HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
1✔
984
            if (hedgingPlan.isHedgeable) {
1✔
985
              pushbackHedging(hedgingPlan.hedgingPushbackMillis);
1✔
986
            }
987
            synchronized (lock) {
1✔
988
              state = state.removeActiveHedge(substream);
1✔
989
              // The invariant is whether or not #(Potential Hedge + active hedges) > 0.
990
              // Once hasPotentialHedging(state) is false, it will always be false, and then
991
              // #(state.activeHedges) will be decreasing. This guarantees that even there may be
992
              // multiple concurrent hedges, one of the hedges will end up committed.
993
              if (hedgingPlan.isHedgeable) {
1✔
994
                if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
1✔
995
                  return;
1✔
996
                }
997
                // else, no activeHedges, no new hedges possible, try to commit
998
              } // else, isHedgeable is false, try to commit
999
            }
1✔
1000
          } else {
1✔
1001
            RetryPlan retryPlan = makeRetryDecision(status, trailers);
1✔
1002
            if (retryPlan.shouldRetry) {
1✔
1003
              // retry
1004
              Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false);
1✔
1005
              if (newSubstream == null) {
1✔
1006
                return;
×
1007
              }
1008
              // The check state.winningSubstream == null, checking if is not already committed, is
1009
              // racy, but is still safe b/c the retry will also handle committed/cancellation
1010
              FutureCanceller scheduledRetryCopy;
1011
              synchronized (lock) {
1✔
1012
                scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
1✔
1013
              }
1✔
1014

1015
              class RetryBackoffRunnable implements Runnable {
1✔
1016
                @Override
1017
                @SuppressWarnings("FutureReturnValueIgnored")
1018
                public void run() {
1019
                  synchronized (scheduledRetryCopy.lock) {
1✔
1020
                    if (scheduledRetryCopy.isCancelled()) {
1✔
1021
                      return;
×
1022
                    } else {
1023
                      scheduledRetryCopy.markCancelled();
1✔
1024
                    }
1025
                  }
1✔
1026

1027
                  callExecutor.execute(
1✔
1028
                      new Runnable() {
1✔
1029
                        @Override
1030
                        public void run() {
1031
                          drain(newSubstream);
1✔
1032
                        }
1✔
1033
                      });
1034
                }
1✔
1035
              }
1036

1037
              scheduledRetryCopy.setFuture(
1✔
1038
                  scheduledExecutorService.schedule(
1✔
1039
                      new RetryBackoffRunnable(),
1040
                      retryPlan.backoffNanos,
1041
                      TimeUnit.NANOSECONDS));
1042
              return;
1✔
1043
            }
1044
          }
1045
        }
1046
      }
1047

1048
      commitAndRun(substream);
1✔
1049
      if (state.winningSubstream == substream) {
1✔
1050
        safeCloseMasterListener(status, rpcProgress, trailers);
1✔
1051
      }
1052
    }
1✔
1053

1054
    /**
1055
     * Decides in current situation whether or not the RPC should retry and if it should retry how
1056
     * long the backoff should be. The decision does not take the commitment status into account, so
1057
     * caller should check it separately. It also updates the throttle. It does not change state.
1058
     */
1059
    private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
1060
      if (retryPolicy == null) {
1✔
1061
        return new RetryPlan(false, 0);
1✔
1062
      }
1063
      boolean shouldRetry = false;
1✔
1064
      long backoffNanos = 0L;
1✔
1065
      boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
1✔
1066
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1067
      boolean isThrottled = false;
1✔
1068
      if (throttle != null) {
1✔
1069
        if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1070
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1071
        }
1072
      }
1073

1074
      if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
1✔
1075
        if (pushbackMillis == null) {
1✔
1076
          if (isRetryableStatusCode) {
1✔
1077
            shouldRetry = true;
1✔
1078
            backoffNanos = intervalWithJitter(nextBackoffIntervalNanos);
1✔
1079
            nextBackoffIntervalNanos = Math.min(
1✔
1080
                (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
1✔
1081
                retryPolicy.maxBackoffNanos);
1✔
1082
          } // else no retry
1083
        } else if (pushbackMillis >= 0) {
1✔
1084
          shouldRetry = true;
1✔
1085
          backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
1✔
1086
          nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
1087
        } // else no retry
1088
      } // else no retry
1089

1090
      return new RetryPlan(shouldRetry, backoffNanos);
1✔
1091
    }
1092

1093
    private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
1094
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1095
      boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
1✔
1096
      boolean isThrottled = false;
1✔
1097
      if (throttle != null) {
1✔
1098
        if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1099
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1100
        }
1101
      }
1102
      if (!isFatal && !isThrottled && !status.isOk()
1✔
1103
          && (pushbackMillis != null && pushbackMillis > 0)) {
1✔
1104
        pushbackMillis = 0; // We want the retry after a nonfatal error to be immediate
1✔
1105
      }
1106
      return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
1✔
1107
    }
1108

1109
    @Nullable
1110
    private Integer getPushbackMills(Metadata trailer) {
1111
      String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
1✔
1112
      Integer pushbackMillis = null;
1✔
1113
      if (pushbackStr != null) {
1✔
1114
        try {
1115
          pushbackMillis = Integer.valueOf(pushbackStr);
1✔
1116
        } catch (NumberFormatException e) {
1✔
1117
          pushbackMillis = -1;
1✔
1118
        }
1✔
1119
      }
1120
      return pushbackMillis;
1✔
1121
    }
1122

1123
    @Override
1124
    public void messagesAvailable(final MessageProducer producer) {
1125
      State savedState = state;
1✔
1126
      checkState(
1✔
1127
          savedState.winningSubstream != null, "Headers should be received prior to messages.");
1128
      if (savedState.winningSubstream != substream) {
1✔
1129
        GrpcUtil.closeQuietly(producer);
1✔
1130
        return;
1✔
1131
      }
1132
      listenerSerializeExecutor.execute(
1✔
1133
          new Runnable() {
1✔
1134
            @Override
1135
            public void run() {
1136
              masterListener.messagesAvailable(producer);
1✔
1137
            }
1✔
1138
          });
1139
    }
1✔
1140

1141
    @Override
1142
    public void onReady() {
1143
      // FIXME(#7089): hedging case is broken.
1144
      if (!isReady()) {
1✔
1145
        return;
1✔
1146
      }
1147
      listenerSerializeExecutor.execute(
1✔
1148
          new Runnable() {
1✔
1149
            @Override
1150
            public void run() {
1151
              if (!isClosed) {
1✔
1152
                masterListener.onReady();
1✔
1153
              }
1154
            }
1✔
1155
          });
1156
    }
1✔
1157
  }
1158

1159
  private static final class State {
1160
    /** Committed and the winning substream drained. */
1161
    final boolean passThrough;
1162

1163
    /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
1164
    @Nullable final List<BufferEntry> buffer;
1165

1166
    /**
1167
     * Unmodifiable collection of all the open substreams that are drained. Singleton once
1168
     * passThrough; Empty if committed but not passTrough.
1169
     */
1170
    final Collection<Substream> drainedSubstreams;
1171

1172
    /**
1173
     * Unmodifiable collection of all the active hedging substreams.
1174
     *
1175
     * <p>A substream even with the attribute substream.closed being true may be considered still
1176
     * "active" at the moment as long as it is in this collection.
1177
     */
1178
    final Collection<Substream> activeHedges; // not null once isHedging = true
1179

1180
    final int hedgingAttemptCount;
1181

1182
    /** Null until committed. */
1183
    @Nullable final Substream winningSubstream;
1184

1185
    /** Not required to set to true when cancelled, but can short-circuit the draining process. */
1186
    final boolean cancelled;
1187

1188
    /** No more hedging due to events like drop or pushback. */
1189
    final boolean hedgingFrozen;
1190

1191
    State(
1192
        @Nullable List<BufferEntry> buffer,
1193
        Collection<Substream> drainedSubstreams,
1194
        Collection<Substream> activeHedges,
1195
        @Nullable Substream winningSubstream,
1196
        boolean cancelled,
1197
        boolean passThrough,
1198
        boolean hedgingFrozen,
1199
        int hedgingAttemptCount) {
1✔
1200
      this.buffer = buffer;
1✔
1201
      this.drainedSubstreams =
1✔
1202
          checkNotNull(drainedSubstreams, "drainedSubstreams");
1✔
1203
      this.winningSubstream = winningSubstream;
1✔
1204
      this.activeHedges = activeHedges;
1✔
1205
      this.cancelled = cancelled;
1✔
1206
      this.passThrough = passThrough;
1✔
1207
      this.hedgingFrozen = hedgingFrozen;
1✔
1208
      this.hedgingAttemptCount = hedgingAttemptCount;
1✔
1209

1210
      checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
1✔
1211
      checkState(
1✔
1212
          !passThrough || winningSubstream != null,
1213
          "passThrough should imply winningSubstream != null");
1214
      checkState(
1✔
1215
          !passThrough
1216
              || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
1✔
1217
              || (drainedSubstreams.size() == 0 && winningSubstream.closed),
1✔
1218
          "passThrough should imply winningSubstream is drained");
1219
      checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
1✔
1220
    }
1✔
1221

1222
    @CheckReturnValue
1223
    // GuardedBy RetriableStream.lock
1224
    State cancelled() {
1225
      return new State(
1✔
1226
          buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough,
1227
          hedgingFrozen, hedgingAttemptCount);
1228
    }
1229

1230
    /** The given substream is drained. */
1231
    @CheckReturnValue
1232
    // GuardedBy RetriableStream.lock
1233
    State substreamDrained(Substream substream) {
1234
      checkState(!passThrough, "Already passThrough");
1✔
1235

1236
      Collection<Substream> drainedSubstreams;
1237
      
1238
      if (substream.closed) {
1✔
1239
        drainedSubstreams = this.drainedSubstreams;
1✔
1240
      } else if (this.drainedSubstreams.isEmpty()) {
1✔
1241
        // optimize for 0-retry, which is most of the cases.
1242
        drainedSubstreams = Collections.singletonList(substream);
1✔
1243
      } else {
1244
        drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1245
        drainedSubstreams.add(substream);
1✔
1246
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1247
      }
1248

1249
      boolean passThrough = winningSubstream != null;
1✔
1250

1251
      List<BufferEntry> buffer = this.buffer;
1✔
1252
      if (passThrough) {
1✔
1253
        checkState(
1✔
1254
            winningSubstream == substream, "Another RPC attempt has already committed");
1255
        buffer = null;
1✔
1256
      }
1257

1258
      return new State(
1✔
1259
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1260
          hedgingFrozen, hedgingAttemptCount);
1261
    }
1262

1263
    /** The given substream is closed. */
1264
    @CheckReturnValue
1265
    // GuardedBy RetriableStream.lock
1266
    State substreamClosed(Substream substream) {
1267
      substream.closed = true;
1✔
1268
      if (this.drainedSubstreams.contains(substream)) {
1✔
1269
        Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1270
        drainedSubstreams.remove(substream);
1✔
1271
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1272
        return new State(
1✔
1273
            buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1274
            hedgingFrozen, hedgingAttemptCount);
1275
      } else {
1276
        return this;
1✔
1277
      }
1278
    }
1279

1280
    @CheckReturnValue
1281
    // GuardedBy RetriableStream.lock
1282
    State committed(Substream winningSubstream) {
1283
      checkState(this.winningSubstream == null, "Already committed");
1✔
1284

1285
      boolean passThrough = false;
1✔
1286
      List<BufferEntry> buffer = this.buffer;
1✔
1287
      Collection<Substream> drainedSubstreams;
1288

1289
      if (this.drainedSubstreams.contains(winningSubstream)) {
1✔
1290
        passThrough = true;
1✔
1291
        buffer = null;
1✔
1292
        drainedSubstreams = Collections.singleton(winningSubstream);
1✔
1293
      } else {
1294
        drainedSubstreams = Collections.emptyList();
1✔
1295
      }
1296

1297
      return new State(
1✔
1298
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1299
          hedgingFrozen, hedgingAttemptCount);
1300
    }
1301

1302
    @CheckReturnValue
1303
    // GuardedBy RetriableStream.lock
1304
    State freezeHedging() {
1305
      if (hedgingFrozen) {
1✔
1306
        return this;
×
1307
      }
1308
      return new State(
1✔
1309
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1310
          true, hedgingAttemptCount);
1311
    }
1312

1313
    @CheckReturnValue
1314
    // GuardedBy RetriableStream.lock
1315
    // state.hedgingAttemptCount is modified only here.
1316
    // The method is only called in RetriableStream.start() and HedgingRunnable.run()
1317
    State addActiveHedge(Substream substream) {
1318
      // hasPotentialHedging must be true
1319
      checkState(!hedgingFrozen, "hedging frozen");
1✔
1320
      checkState(winningSubstream == null, "already committed");
1✔
1321

1322
      Collection<Substream> activeHedges;
1323
      if (this.activeHedges == null) {
1✔
1324
        activeHedges = Collections.singleton(substream);
1✔
1325
      } else {
1326
        activeHedges = new ArrayList<>(this.activeHedges);
1✔
1327
        activeHedges.add(substream);
1✔
1328
        activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1329
      }
1330

1331
      int hedgingAttemptCount = this.hedgingAttemptCount + 1;
1✔
1332
      return new State(
1✔
1333
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1334
          hedgingFrozen, hedgingAttemptCount);
1335
    }
1336

1337
    @CheckReturnValue
1338
    // GuardedBy RetriableStream.lock
1339
    // The method is only called in Sublistener.closed()
1340
    State removeActiveHedge(Substream substream) {
1341
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1342
      activeHedges.remove(substream);
1✔
1343
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1344

1345
      return new State(
1✔
1346
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1347
          hedgingFrozen, hedgingAttemptCount);
1348
    }
1349

1350
    @CheckReturnValue
1351
    // GuardedBy RetriableStream.lock
1352
    // The method is only called for transparent retry.
1353
    State replaceActiveHedge(Substream oldOne, Substream newOne) {
1354
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1355
      activeHedges.remove(oldOne);
1✔
1356
      activeHedges.add(newOne);
1✔
1357
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1358

1359
      return new State(
1✔
1360
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1361
          hedgingFrozen, hedgingAttemptCount);
1362
    }
1363
  }
1364

1365
  /**
1366
   * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
1367
   *  attributes.
1368
   */
1369
  private static final class Substream {
1370
    ClientStream stream;
1371

1372
    // GuardedBy RetriableStream.lock
1373
    boolean closed;
1374

1375
    // setting to true must be GuardedBy RetriableStream.lock
1376
    boolean bufferLimitExceeded;
1377

1378
    final int previousAttemptCount;
1379

1380
    Substream(int previousAttemptCount) {
1✔
1381
      this.previousAttemptCount = previousAttemptCount;
1✔
1382
    }
1✔
1383
  }
1384

1385

1386
  /**
1387
   * Traces the buffer used by a substream.
1388
   */
1389
  class BufferSizeTracer extends ClientStreamTracer {
1390
    // Each buffer size tracer is dedicated to one specific substream.
1391
    private final Substream substream;
1392

1393
    @GuardedBy("lock")
1394
    long bufferNeeded;
1395

1396
    BufferSizeTracer(Substream substream) {
1✔
1397
      this.substream = substream;
1✔
1398
    }
1✔
1399

1400
    /**
1401
     * A message is sent to the wire, so its reference would be released if no retry or
1402
     * hedging were involved. So at this point we have to hold the reference of the message longer
1403
     * for retry, and we need to increment {@code substream.bufferNeeded}.
1404
     */
1405
    @Override
1406
    public void outboundWireSize(long bytes) {
1407
      if (state.winningSubstream != null) {
1✔
1408
        return;
1✔
1409
      }
1410

1411
      Runnable postCommitTask = null;
1✔
1412

1413
      // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
1414
      synchronized (lock) {
1✔
1415
        if (state.winningSubstream != null || substream.closed) {
1✔
1416
          return;
×
1417
        }
1418
        bufferNeeded += bytes;
1✔
1419
        if (bufferNeeded <= perRpcBufferUsed) {
1✔
1420
          return;
1✔
1421
        }
1422

1423
        if (bufferNeeded > perRpcBufferLimit) {
1✔
1424
          substream.bufferLimitExceeded = true;
1✔
1425
        } else {
1426
          // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
1427
          long savedChannelBufferUsed =
1✔
1428
              channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
1✔
1429
          perRpcBufferUsed = bufferNeeded;
1✔
1430

1431
          if (savedChannelBufferUsed > channelBufferLimit) {
1✔
1432
            substream.bufferLimitExceeded = true;
1✔
1433
          }
1434
        }
1435

1436
        if (substream.bufferLimitExceeded) {
1✔
1437
          postCommitTask = commit(substream);
1✔
1438
        }
1439
      }
1✔
1440

1441
      if (postCommitTask != null) {
1✔
1442
        postCommitTask.run();
1✔
1443
      }
1444
    }
1✔
1445
  }
1446

1447
  /**
1448
   *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
1449
   *  the Channel. There should be a single instance of it for each channel.
1450
   */
1451
  static final class ChannelBufferMeter {
1✔
1452
    private final AtomicLong bufferUsed = new AtomicLong();
1✔
1453

1454
    @VisibleForTesting
1455
    long addAndGet(long newBytesUsed) {
1456
      return bufferUsed.addAndGet(newBytesUsed);
1✔
1457
    }
1458
  }
1459

1460
  /**
1461
   * Used for retry throttling.
1462
   */
1463
  static final class Throttle {
1464

1465
    private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
1466

1467
    /**
1468
     * 1000 times the maxTokens field of the retryThrottling policy in service config.
1469
     * The number of tokens starts at maxTokens. The token_count will always be between 0 and
1470
     * maxTokens.
1471
     */
1472
    final int maxTokens;
1473

1474
    /**
1475
     * Half of {@code maxTokens}.
1476
     */
1477
    final int threshold;
1478

1479
    /**
1480
     * 1000 times the tokenRatio field of the retryThrottling policy in service config.
1481
     */
1482
    final int tokenRatio;
1483

1484
    final AtomicInteger tokenCount = new AtomicInteger();
1✔
1485

1486
    Throttle(float maxTokens, float tokenRatio) {
1✔
1487
      // tokenRatio is up to 3 decimal places
1488
      this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1489
      this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1490
      this.threshold = this.maxTokens / 2;
1✔
1491
      tokenCount.set(this.maxTokens);
1✔
1492
    }
1✔
1493

1494
    @VisibleForTesting
1495
    boolean isAboveThreshold() {
1496
      return tokenCount.get() > threshold;
1✔
1497
    }
1498

1499
    /**
1500
     * Counts down the token on qualified failure and checks if it is above the threshold
1501
     * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
1502
     * a not-to-retry pushback.
1503
     */
1504
    @VisibleForTesting
1505
    boolean onQualifiedFailureThenCheckIsAboveThreshold() {
1506
      while (true) {
1507
        int currentCount = tokenCount.get();
1✔
1508
        if (currentCount == 0) {
1✔
1509
          return false;
1✔
1510
        }
1511
        int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1512
        boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
1✔
1513
        if (updated) {
1✔
1514
          return decremented > threshold;
1✔
1515
        }
1516
      }
×
1517
    }
1518

1519
    @VisibleForTesting
1520
    void onSuccess() {
1521
      while (true) {
1522
        int currentCount = tokenCount.get();
1✔
1523
        if (currentCount == maxTokens) {
1✔
1524
          break;
1✔
1525
        }
1526
        int incremented = currentCount + tokenRatio;
1✔
1527
        boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
1✔
1528
        if (updated) {
1✔
1529
          break;
1✔
1530
        }
1531
      }
×
1532
    }
1✔
1533

1534
    @Override
1535
    public boolean equals(Object o) {
1536
      if (this == o) {
1✔
1537
        return true;
×
1538
      }
1539
      if (!(o instanceof Throttle)) {
1✔
1540
        return false;
×
1541
      }
1542
      Throttle that = (Throttle) o;
1✔
1543
      return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
1✔
1544
    }
1545

1546
    @Override
1547
    public int hashCode() {
1548
      return Objects.hashCode(maxTokens, tokenRatio);
×
1549
    }
1550
  }
1551

1552
  private static final class RetryPlan {
1553
    final boolean shouldRetry;
1554
    final long backoffNanos;
1555

1556
    RetryPlan(boolean shouldRetry, long backoffNanos) {
1✔
1557
      this.shouldRetry = shouldRetry;
1✔
1558
      this.backoffNanos = backoffNanos;
1✔
1559
    }
1✔
1560
  }
1561

1562
  private static final class HedgingPlan {
1563
    final boolean isHedgeable;
1564
    @Nullable
1565
    final Integer hedgingPushbackMillis;
1566

1567
    public HedgingPlan(
1568
        boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
1✔
1569
      this.isHedgeable = isHedgeable;
1✔
1570
      this.hedgingPushbackMillis = hedgingPushbackMillis;
1✔
1571
    }
1✔
1572
  }
1573

1574
  /** Allows cancelling a Future without racing with setting the future. */
1575
  private static final class FutureCanceller {
1576

1577
    final Object lock;
1578
    @GuardedBy("lock")
1579
    Future<?> future;
1580
    @GuardedBy("lock")
1581
    boolean cancelled;
1582

1583
    FutureCanceller(Object lock) {
1✔
1584
      this.lock = lock;
1✔
1585
    }
1✔
1586

1587
    void setFuture(Future<?> future) {
1588
      boolean wasCancelled;
1589
      synchronized (lock) {
1✔
1590
        wasCancelled = cancelled;
1✔
1591
        if (!wasCancelled) {
1✔
1592
          this.future = future;
1✔
1593
        }
1594
      }
1✔
1595
      if (wasCancelled) {
1✔
1596
        future.cancel(false);
×
1597
      }
1598
    }
1✔
1599

1600
    @GuardedBy("lock")
1601
    @CheckForNull // Must cancel the returned future if not null.
1602
    Future<?> markCancelled() {
1603
      cancelled = true;
1✔
1604
      return future;
1✔
1605
    }
1606

1607
    @GuardedBy("lock")
1608
    boolean isCancelled() {
1609
      return cancelled;
1✔
1610
    }
1611
  }
1612
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc