• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20190

10 Mar 2026 10:33PM UTC coverage: 88.687% (-0.02%) from 88.705%
#20190

push

github

web-flow
android-interop-testing: Remove usage of MultiDexApplication

Since we're only supporting API levels 23+, all the supported Android
versions handle multidex natively, and without any bugs to workaround.

Also bump some minSdkVersion that didn't get updated in fa7b52b so
that multiDex is actually enabled by default.

See also b/476359563

35467 of 39991 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.07
/../core/src/main/java/io/grpc/internal/RetriableStream.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Objects;
25
import com.google.errorprone.annotations.CheckReturnValue;
26
import com.google.errorprone.annotations.concurrent.GuardedBy;
27
import io.grpc.Attributes;
28
import io.grpc.ClientStreamTracer;
29
import io.grpc.Compressor;
30
import io.grpc.Deadline;
31
import io.grpc.DecompressorRegistry;
32
import io.grpc.Metadata;
33
import io.grpc.MethodDescriptor;
34
import io.grpc.Status;
35
import io.grpc.SynchronizationContext;
36
import io.grpc.internal.ClientStreamListener.RpcProgress;
37
import java.io.InputStream;
38
import java.lang.Thread.UncaughtExceptionHandler;
39
import java.util.ArrayList;
40
import java.util.Collection;
41
import java.util.Collections;
42
import java.util.List;
43
import java.util.Random;
44
import java.util.concurrent.Executor;
45
import java.util.concurrent.Future;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import java.util.concurrent.atomic.AtomicBoolean;
49
import java.util.concurrent.atomic.AtomicInteger;
50
import java.util.concurrent.atomic.AtomicLong;
51
import javax.annotation.CheckForNull;
52
import javax.annotation.Nullable;
53

54
/** A logical {@link ClientStream} that is retriable. */
55
abstract class RetriableStream<ReqT> implements ClientStream {
56
  @VisibleForTesting
57
  static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
1✔
58
      Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
1✔
59

60
  @VisibleForTesting
61
  static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
1✔
62
      Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
1✔
63

64
  private static final Status CANCELLED_BECAUSE_COMMITTED =
1✔
65
      Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
1✔
66

67
  private final MethodDescriptor<ReqT, ?> method;
68
  private final Executor callExecutor;
69
  private final Executor listenerSerializeExecutor = new SynchronizationContext(
1✔
70
      new UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          throw Status.fromThrowable(e)
×
74
              .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
×
75
              .asRuntimeException();
×
76
        }
77
      }
78
  );
79
  private final ScheduledExecutorService scheduledExecutorService;
80
  // Must not modify it.
81
  private final Metadata headers;
82
  @Nullable
83
  private final RetryPolicy retryPolicy;
84
  @Nullable
85
  private final HedgingPolicy hedgingPolicy;
86
  private final boolean isHedging;
87

88
  /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
89
  private final Object lock = new Object();
1✔
90

91
  private final ChannelBufferMeter channelBufferUsed;
92
  private final long perRpcBufferLimit;
93
  private final long channelBufferLimit;
94
  @Nullable
95
  private final Throttle throttle;
96
  @GuardedBy("lock")
1✔
97
  private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
98

99
  private volatile State state = new State(
1✔
100
      new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
1✔
101
      false, 0);
102

103
  /**
104
   * Either non-local transparent retry happened or reached server's application logic.
105
   *
106
   * <p>Note that local-only transparent retries are unlimited.
107
   */
108
  private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
1✔
109
  private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
1✔
110
  private final AtomicInteger inFlightSubStreams = new AtomicInteger();
1✔
111
  private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112

113
  // Used for recording the share of buffer used for the current call out of the channel buffer.
114
  // This field would not be necessary if there is no channel buffer limit.
115
  @GuardedBy("lock")
116
  private long perRpcBufferUsed;
117

118
  private ClientStreamListener masterListener;
119
  @GuardedBy("lock")
120
  private FutureCanceller scheduledRetry;
121
  @GuardedBy("lock")
122
  private FutureCanceller scheduledHedging;
123
  private long nextBackoffIntervalNanos;
124
  private Status cancellationStatus;
125
  private boolean isClosed;
126

127
  RetriableStream(
128
      MethodDescriptor<ReqT, ?> method, Metadata headers,
129
      ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
130
      Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
131
      @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy,
132
      @Nullable Throttle throttle) {
1✔
133
    this.method = method;
1✔
134
    this.channelBufferUsed = channelBufferUsed;
1✔
135
    this.perRpcBufferLimit = perRpcBufferLimit;
1✔
136
    this.channelBufferLimit = channelBufferLimit;
1✔
137
    this.callExecutor = callExecutor;
1✔
138
    this.scheduledExecutorService = scheduledExecutorService;
1✔
139
    this.headers = headers;
1✔
140
    this.retryPolicy = retryPolicy;
1✔
141
    if (retryPolicy != null) {
1✔
142
      this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
143
    }
144
    this.hedgingPolicy = hedgingPolicy;
1✔
145
    checkArgument(
1✔
146
        retryPolicy == null || hedgingPolicy == null,
147
        "Should not provide both retryPolicy and hedgingPolicy");
148
    this.isHedging = hedgingPolicy != null;
1✔
149
    this.throttle = throttle;
1✔
150
  }
1✔
151

152
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153
  @Nullable // null if already committed
154
  @CheckReturnValue
155
  private Runnable commit(final Substream winningSubstream) {
156
    synchronized (lock) {
1✔
157
      if (state.winningSubstream != null) {
1✔
158
        return null;
1✔
159
      }
160
      final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
1✔
161

162
      state = state.committed(winningSubstream);
1✔
163

164
      // subtract the share of this RPC from channelBufferUsed.
165
      channelBufferUsed.addAndGet(-perRpcBufferUsed);
1✔
166

167
      final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
1✔
168
      final Future<?> retryFuture;
169
      final boolean retryWasScheduled = scheduledRetry != null;
1✔
170
      if (retryWasScheduled) {
1✔
171
        retryFuture = scheduledRetry.markCancelled();
1✔
172
        scheduledRetry = null;
1✔
173
      } else {
174
        retryFuture = null;
1✔
175
      }
176
      // cancel the scheduled hedging if it is scheduled prior to the commitment
177
      final Future<?> hedgingFuture;
178
      if (scheduledHedging != null) {
1✔
179
        hedgingFuture = scheduledHedging.markCancelled();
1✔
180
        scheduledHedging = null;
1✔
181
      } else {
182
        hedgingFuture = null;
1✔
183
      }
184

185
      class CommitTask implements Runnable {
1✔
186
        @Override
187
        public void run() {
188
          // For hedging only, not needed for normal retry
189
          for (Substream substream : savedDrainedSubstreams) {
1✔
190
            if (substream != winningSubstream) {
1✔
191
              substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
1✔
192
            }
193
          }
1✔
194
          if (retryWasScheduled) {
1✔
195
            if (retryFuture != null) {
1✔
196
              retryFuture.cancel(false);
1✔
197
            }
198
            if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
199
              assert savedCloseMasterListenerReason != null;
×
200
              listenerSerializeExecutor.execute(
×
201
                  new Runnable() {
×
202
                    @Override
203
                    public void run() {
204
                      isClosed = true;
×
205
                      masterListener.closed(savedCloseMasterListenerReason.status,
×
206
                          savedCloseMasterListenerReason.progress,
×
207
                          savedCloseMasterListenerReason.metadata);
×
208
                    }
×
209
                  });
210
            }
211
          }
212

213
          if (hedgingFuture != null) {
1✔
214
            hedgingFuture.cancel(false);
1✔
215
          }
216

217
          postCommit();
1✔
218
        }
1✔
219
      }
220

221
      return new CommitTask();
1✔
222
    }
223
  }
224

225
  abstract void postCommit();
226

227
  /**
228
   * Calls commit() and if successful runs the post commit task. Post commit task will be non-null
229
   * for only once. The post commit task cancels other non-winning streams on separate transport
230
   * threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream
231
   * transports.(issues/10314)
232
   * This method should be called only in subListener callbacks. This guarantees callExecutor
233
   * schedules tasks before master listener closes, which is protected by the inFlightSubStreams
234
   * decorative. That is because:
235
   * For a successful winning stream, other streams won't attempt to close master listener.
236
   * For a cancelled winning stream (noop), other stream won't attempt to close master listener.
237
   * For a failed/closed winning stream, the last closed stream closes the master listener, and
238
   * callExecutor scheduling happens-before that.
239
   */
240
  private void commitAndRun(Substream winningSubstream) {
241
    Runnable postCommitTask = commit(winningSubstream);
1✔
242

243
    if (postCommitTask != null) {
1✔
244
      callExecutor.execute(postCommitTask);
1✔
245
    }
246
  }
1✔
247

248
  // returns null means we should not create new sub streams, e.g. cancelled or
249
  // other close condition is met for retriableStream.
250
  @Nullable
251
  private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry,
252
                                    boolean isHedgedStream) {
253
    int inFlight;
254
    do {
255
      inFlight = inFlightSubStreams.get();
1✔
256
      if (inFlight < 0) {
1✔
257
        return null;
×
258
      }
259
    } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1));
1✔
260
    Substream sub = new Substream(previousAttemptCount);
1✔
261
    // one tracer per substream
262
    final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
1✔
263
    ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
1✔
264
      @Override
265
      public ClientStreamTracer newClientStreamTracer(
266
          ClientStreamTracer.StreamInfo info, Metadata headers) {
267
        return bufferSizeTracer;
1✔
268
      }
269
    };
270

271
    Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
1✔
272
    // NOTICE: This set _must_ be done before stream.start() and it actually is.
273
    sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry,
1✔
274
        isHedgedStream);
275
    return sub;
1✔
276
  }
277

278
  /**
279
   * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
280
   * Client stream is not yet started.
281
   */
282
  abstract ClientStream newSubstream(
283
      Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
284
      boolean isTransparentRetry, boolean isHedgedStream);
285

286
  /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
287
  @VisibleForTesting
288
  final Metadata updateHeaders(
289
      Metadata originalHeaders, int previousAttemptCount) {
290
    Metadata newHeaders = new Metadata();
1✔
291
    newHeaders.merge(originalHeaders);
1✔
292
    if (previousAttemptCount > 0) {
1✔
293
      newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount));
1✔
294
    }
295
    return newHeaders;
1✔
296
  }
297

298
  private void drain(Substream substream) {
299
    int index = 0;
1✔
300
    int chunk = 0x80;
1✔
301
    List<BufferEntry> list = null;
1✔
302
    boolean streamStarted = false;
1✔
303
    Runnable onReadyRunnable = null;
1✔
304

305
    while (true) {
306
      State savedState;
307

308
      synchronized (lock) {
1✔
309
        savedState = state;
1✔
310
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
311
          // committed but not me, to be cancelled
312
          break;
1✔
313
        }
314
        if (savedState.cancelled) {
1✔
315
          break;
1✔
316
        }
317
        if (index == savedState.buffer.size()) { // I'm drained
1✔
318
          state = savedState.substreamDrained(substream);
1✔
319
          if (!isReady()) {
1✔
320
            return;
1✔
321
          }
322
          onReadyRunnable = new Runnable() {
1✔
323
            @Override
324
            public void run() {
325
              if (!isClosed) {
1✔
326
                masterListener.onReady();
1✔
327
              }
328
            }
1✔
329
          };
330
          break;
1✔
331
        }
332

333
        if (substream.closed) {
1✔
334
          return;
×
335
        }
336

337
        int stop = Math.min(index + chunk, savedState.buffer.size());
1✔
338
        if (list == null) {
1✔
339
          list = new ArrayList<>(savedState.buffer.subList(index, stop));
1✔
340
        } else {
341
          list.clear();
1✔
342
          list.addAll(savedState.buffer.subList(index, stop));
1✔
343
        }
344
        index = stop;
1✔
345
      }
1✔
346

347
      for (BufferEntry bufferEntry : list) {
1✔
348
        bufferEntry.runWith(substream);
1✔
349
        if (bufferEntry instanceof RetriableStream.StartEntry) {
1✔
350
          streamStarted = true;
1✔
351
        }
352
        savedState = state;
1✔
353
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
354
          // committed but not me, to be cancelled
355
          break;
1✔
356
        }
357
        if (savedState.cancelled) {
1✔
358
          break;
1✔
359
        }
360
      }
1✔
361
    }
1✔
362

363
    if (onReadyRunnable != null) {
1✔
364
      listenerSerializeExecutor.execute(onReadyRunnable);
1✔
365
      return;
1✔
366
    }
367

368
    if (!streamStarted) {
1✔
369
      // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
370
      substream.stream.start(new Sublistener(substream));
1✔
371
    }
372
    substream.stream.cancel(
1✔
373
        state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
1✔
374
  }
1✔
375

376
  /**
377
   * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
378
   */
379
  @CheckReturnValue
380
  @Nullable
381
  abstract Status prestart();
382

383
  class StartEntry implements BufferEntry {
1✔
384
    @Override
385
    public void runWith(Substream substream) {
386
      substream.stream.start(new Sublistener(substream));
1✔
387
    }
1✔
388
  }
389

390
  /** Starts the first RPC attempt. */
391
  @Override
392
  public final void start(ClientStreamListener listener) {
393
    masterListener = listener;
1✔
394

395
    Status shutdownStatus = prestart();
1✔
396

397
    if (shutdownStatus != null) {
1✔
398
      cancel(shutdownStatus);
1✔
399
      return;
1✔
400
    }
401

402
    synchronized (lock) {
1✔
403
      state.buffer.add(new StartEntry());
1✔
404
    }
1✔
405

406
    Substream substream = createSubstream(0, false, false);
1✔
407
    if (substream == null) {
1✔
408
      return;
×
409
    }
410
    if (isHedging) {
1✔
411
      FutureCanceller scheduledHedgingRef = null;
1✔
412

413
      synchronized (lock) {
1✔
414
        state = state.addActiveHedge(substream);
1✔
415
        if (hasPotentialHedging(state)
1✔
416
            && (throttle == null || throttle.isAboveThreshold())) {
1✔
417
          scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
1✔
418
        }
419
      }
1✔
420

421
      if (scheduledHedgingRef != null) {
1✔
422
        scheduledHedgingRef.setFuture(
1✔
423
            scheduledExecutorService.schedule(
1✔
424
                new HedgingRunnable(scheduledHedgingRef),
425
                hedgingPolicy.hedgingDelayNanos,
426
                TimeUnit.NANOSECONDS));
427
      }
428
    }
429

430
    drain(substream);
1✔
431
  }
1✔
432

433
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
434
  private void pushbackHedging(@Nullable Integer delayMillis) {
435
    if (delayMillis == null) {
1✔
436
      return;
1✔
437
    }
438
    if (delayMillis < 0) {
1✔
439
      freezeHedging();
1✔
440
      return;
1✔
441
    }
442

443
    // Cancels the current scheduledHedging and reschedules a new one.
444
    FutureCanceller future;
445
    Future<?> futureToBeCancelled;
446

447
    synchronized (lock) {
1✔
448
      if (scheduledHedging == null) {
1✔
449
        return;
×
450
      }
451

452
      futureToBeCancelled = scheduledHedging.markCancelled();
1✔
453
      scheduledHedging = future = new FutureCanceller(lock);
1✔
454
    }
1✔
455

456
    if (futureToBeCancelled != null) {
1✔
457
      futureToBeCancelled.cancel(false);
1✔
458
    }
459
    future.setFuture(scheduledExecutorService.schedule(
1✔
460
        new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS));
1✔
461
  }
1✔
462

463
  private final class HedgingRunnable implements Runnable {
464

465
    // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
466
    // by a positive push-back just after newSubstream is instantiated, so that we can double check.
467
    final FutureCanceller scheduledHedgingRef;
468

469
    HedgingRunnable(FutureCanceller scheduledHedging) {
1✔
470
      scheduledHedgingRef = scheduledHedging;
1✔
471
    }
1✔
472

473
    @Override
474
    public void run() {
475
      // It's safe to read state.hedgingAttemptCount here.
476
      // If this run is not cancelled, the value of state.hedgingAttemptCount won't change
477
      // until state.addActiveHedge() is called subsequently, even the state could possibly
478
      // change.
479
      Substream newSubstream = createSubstream(state.hedgingAttemptCount, false, true);
1✔
480
      if (newSubstream == null) {
1✔
481
        return;
×
482
      }
483
      callExecutor.execute(
1✔
484
          new Runnable() {
1✔
485
            @SuppressWarnings("GuardedBy")  //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
486
            @Override
487
            public void run() {
488
              boolean cancelled = false;
1✔
489
              FutureCanceller future = null;
1✔
490

491
              synchronized (lock) {
1✔
492
                if (scheduledHedgingRef.isCancelled()) {
1✔
493
                  cancelled = true;
×
494
                } else {
495
                  state = state.addActiveHedge(newSubstream);
1✔
496
                  if (hasPotentialHedging(state)
1✔
497
                      && (throttle == null || throttle.isAboveThreshold())) {
1✔
498
                    scheduledHedging = future = new FutureCanceller(lock);
1✔
499
                  } else {
500
                    state = state.freezeHedging();
1✔
501
                    scheduledHedging = null;
1✔
502
                  }
503
                }
504
              }
1✔
505

506
              if (cancelled) {
1✔
507
                // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
508
                newSubstream.stream.start(new Sublistener(newSubstream));
×
509
                newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
×
510
                return;
×
511
              }
512
              if (future != null) {
1✔
513
                future.setFuture(
1✔
514
                    scheduledExecutorService.schedule(
1✔
515
                        new HedgingRunnable(future),
516
                        hedgingPolicy.hedgingDelayNanos,
1✔
517
                        TimeUnit.NANOSECONDS));
518
              }
519
              drain(newSubstream);
1✔
520
            }
1✔
521
          });
522
    }
1✔
523
  }
524

525
  @Override
526
  public final void cancel(final Status reason) {
527
    Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
1✔
528
    noopSubstream.stream = new NoopClientStream();
1✔
529
    Runnable runnable = commit(noopSubstream);
1✔
530

531
    if (runnable != null) {
1✔
532
      synchronized (lock) {
1✔
533
        state = state.substreamDrained(noopSubstream);
1✔
534
      }
1✔
535
      runnable.run();
1✔
536
      safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
1✔
537
      return;
1✔
538
    }
539

540
    Substream winningSubstreamToCancel = null;
1✔
541
    synchronized (lock) {
1✔
542
      if (state.drainedSubstreams.contains(state.winningSubstream)) {
1✔
543
        winningSubstreamToCancel = state.winningSubstream;
1✔
544
      } else { // the winningSubstream will be cancelled while draining
545
        cancellationStatus = reason;
1✔
546
      }
547
      state = state.cancelled();
1✔
548
    }
1✔
549
    if (winningSubstreamToCancel != null) {
1✔
550
      winningSubstreamToCancel.stream.cancel(reason);
1✔
551
    }
552
  }
1✔
553

554
  private void delayOrExecute(BufferEntry bufferEntry) {
555
    Collection<Substream> savedDrainedSubstreams;
556
    synchronized (lock) {
1✔
557
      if (!state.passThrough) {
1✔
558
        state.buffer.add(bufferEntry);
1✔
559
      }
560
      savedDrainedSubstreams = state.drainedSubstreams;
1✔
561
    }
1✔
562

563
    for (Substream substream : savedDrainedSubstreams) {
1✔
564
      bufferEntry.runWith(substream);
1✔
565
    }
1✔
566
  }
1✔
567

568
  /**
569
   * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use
570
   * InputStream for buffering.
571
   */
572
  @Override
573
  public final void writeMessage(InputStream message) {
574
    throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
×
575
  }
576

577
  final void sendMessage(final ReqT message) {
578
    State savedState = state;
1✔
579
    if (savedState.passThrough) {
1✔
580
      savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
1✔
581
      return;
1✔
582
    }
583

584
    class SendMessageEntry implements BufferEntry {
1✔
585
      @Override
586
      public void runWith(Substream substream) {
587
        substream.stream.writeMessage(method.streamRequest(message));
1✔
588
        // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
589
        // flushes (or half close), but retry appears to have a code path that the flushes may
590
        // not happen. The code needs to be fixed and this removed. See #9340.
591
        substream.stream.flush();
1✔
592
      }
1✔
593
    }
594

595
    delayOrExecute(new SendMessageEntry());
1✔
596
  }
1✔
597

598
  @Override
599
  public final void request(final int numMessages) {
600
    State savedState = state;
1✔
601
    if (savedState.passThrough) {
1✔
602
      savedState.winningSubstream.stream.request(numMessages);
1✔
603
      return;
1✔
604
    }
605

606
    class RequestEntry implements BufferEntry {
1✔
607
      @Override
608
      public void runWith(Substream substream) {
609
        substream.stream.request(numMessages);
1✔
610
      }
1✔
611
    }
612

613
    delayOrExecute(new RequestEntry());
1✔
614
  }
1✔
615

616
  @Override
617
  public final void flush() {
618
    State savedState = state;
1✔
619
    if (savedState.passThrough) {
1✔
620
      savedState.winningSubstream.stream.flush();
1✔
621
      return;
1✔
622
    }
623

624
    class FlushEntry implements BufferEntry {
1✔
625
      @Override
626
      public void runWith(Substream substream) {
627
        substream.stream.flush();
1✔
628
      }
1✔
629
    }
630

631
    delayOrExecute(new FlushEntry());
1✔
632
  }
1✔
633

634
  @Override
635
  public final boolean isReady() {
636
    for (Substream substream : state.drainedSubstreams) {
1✔
637
      if (substream.stream.isReady()) {
1✔
638
        return true;
1✔
639
      }
640
    }
1✔
641
    return false;
1✔
642
  }
643

644
  @Override
645
  public void optimizeForDirectExecutor() {
646
    class OptimizeDirectEntry implements BufferEntry {
1✔
647
      @Override
648
      public void runWith(Substream substream) {
649
        substream.stream.optimizeForDirectExecutor();
1✔
650
      }
1✔
651
    }
652

653
    delayOrExecute(new OptimizeDirectEntry());
1✔
654
  }
1✔
655

656
  @Override
657
  public final void setCompressor(final Compressor compressor) {
658
    class CompressorEntry implements BufferEntry {
1✔
659
      @Override
660
      public void runWith(Substream substream) {
661
        substream.stream.setCompressor(compressor);
1✔
662
      }
1✔
663
    }
664

665
    delayOrExecute(new CompressorEntry());
1✔
666
  }
1✔
667

668
  @Override
669
  public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
670
    class FullStreamDecompressionEntry implements BufferEntry {
1✔
671
      @Override
672
      public void runWith(Substream substream) {
673
        substream.stream.setFullStreamDecompression(fullStreamDecompression);
1✔
674
      }
1✔
675
    }
676

677
    delayOrExecute(new FullStreamDecompressionEntry());
1✔
678
  }
1✔
679

680
  @Override
681
  public final void setMessageCompression(final boolean enable) {
682
    class MessageCompressionEntry implements BufferEntry {
1✔
683
      @Override
684
      public void runWith(Substream substream) {
685
        substream.stream.setMessageCompression(enable);
1✔
686
      }
1✔
687
    }
688

689
    delayOrExecute(new MessageCompressionEntry());
1✔
690
  }
1✔
691

692
  @Override
693
  public final void halfClose() {
694
    class HalfCloseEntry implements BufferEntry {
1✔
695
      @Override
696
      public void runWith(Substream substream) {
697
        substream.stream.halfClose();
1✔
698
      }
1✔
699
    }
700

701
    delayOrExecute(new HalfCloseEntry());
1✔
702
  }
1✔
703

704
  @Override
705
  public final void setAuthority(final String authority) {
706
    class AuthorityEntry implements BufferEntry {
1✔
707
      @Override
708
      public void runWith(Substream substream) {
709
        substream.stream.setAuthority(authority);
1✔
710
      }
1✔
711
    }
712

713
    delayOrExecute(new AuthorityEntry());
1✔
714
  }
1✔
715

716
  @Override
717
  public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
718
    class DecompressorRegistryEntry implements BufferEntry {
1✔
719
      @Override
720
      public void runWith(Substream substream) {
721
        substream.stream.setDecompressorRegistry(decompressorRegistry);
1✔
722
      }
1✔
723
    }
724

725
    delayOrExecute(new DecompressorRegistryEntry());
1✔
726
  }
1✔
727

728
  @Override
729
  public final void setMaxInboundMessageSize(final int maxSize) {
730
    class MaxInboundMessageSizeEntry implements BufferEntry {
1✔
731
      @Override
732
      public void runWith(Substream substream) {
733
        substream.stream.setMaxInboundMessageSize(maxSize);
1✔
734
      }
1✔
735
    }
736

737
    delayOrExecute(new MaxInboundMessageSizeEntry());
1✔
738
  }
1✔
739

740
  @Override
741
  public final void setMaxOutboundMessageSize(final int maxSize) {
742
    class MaxOutboundMessageSizeEntry implements BufferEntry {
1✔
743
      @Override
744
      public void runWith(Substream substream) {
745
        substream.stream.setMaxOutboundMessageSize(maxSize);
1✔
746
      }
1✔
747
    }
748

749
    delayOrExecute(new MaxOutboundMessageSizeEntry());
1✔
750
  }
1✔
751

752
  @Override
753
  public final void setDeadline(final Deadline deadline) {
754
    class DeadlineEntry implements BufferEntry {
1✔
755
      @Override
756
      public void runWith(Substream substream) {
757
        substream.stream.setDeadline(deadline);
1✔
758
      }
1✔
759
    }
760

761
    delayOrExecute(new DeadlineEntry());
1✔
762
  }
1✔
763

764
  @Override
765
  public final Attributes getAttributes() {
766
    if (state.winningSubstream != null) {
1✔
767
      return state.winningSubstream.stream.getAttributes();
1✔
768
    }
769
    return Attributes.EMPTY;
×
770
  }
771

772
  @Override
773
  public void appendTimeoutInsight(InsightBuilder insight) {
774
    State currentState;
775
    synchronized (lock) {
1✔
776
      insight.appendKeyValue("closed", closedSubstreamsInsight);
1✔
777
      currentState = state;
1✔
778
    }
1✔
779
    if (currentState.winningSubstream != null) {
1✔
780
      // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
781
      // of the winning substream, they may not have received closed() notifications yet, thus they
782
      // may be missing from closedSubstreamsInsight.  This may be a little confusing to the user.
783
      // We need to figure out how to include them.
784
      InsightBuilder substreamInsight = new InsightBuilder();
1✔
785
      currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
1✔
786
      insight.appendKeyValue("committed", substreamInsight);
1✔
787
    } else {
1✔
788
      InsightBuilder openSubstreamsInsight = new InsightBuilder();
1✔
789
      // drainedSubstreams doesn't include all open substreams.  Those which have just been created
790
      // and are still catching up with buffered requests (in other words, still draining) will not
791
      // show up.  We think this is benign, because the draining should be typically fast, and it'd
792
      // be indistinguishable from the case where those streams are to be created a little late due
793
      // to delays in the timer.
794
      for (Substream sub : currentState.drainedSubstreams) {
1✔
795
        InsightBuilder substreamInsight = new InsightBuilder();
1✔
796
        sub.stream.appendTimeoutInsight(substreamInsight);
1✔
797
        openSubstreamsInsight.append(substreamInsight);
1✔
798
      }
1✔
799
      insight.appendKeyValue("open", openSubstreamsInsight);
1✔
800
    }
801
  }
1✔
802

803
  private static Random random = new Random();
1✔
804

805
  @VisibleForTesting
806
  static void setRandom(Random random) {
807
    RetriableStream.random = random;
1✔
808
  }
1✔
809

810
  /**
811
   * Whether there is any potential hedge at the moment. A false return value implies there is
812
   * absolutely no potential hedge. At least one of the hedges will observe a false return value
813
   * when calling this method, unless otherwise the rpc is committed.
814
   */
815
  // only called when isHedging is true
816
  @GuardedBy("lock")
817
  private boolean hasPotentialHedging(State state) {
818
    return state.winningSubstream == null
1✔
819
        && state.hedgingAttemptCount < hedgingPolicy.maxAttempts
820
        && !state.hedgingFrozen;
821
  }
822

823
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
824
  private void freezeHedging() {
825
    Future<?> futureToBeCancelled = null;
1✔
826
    synchronized (lock) {
1✔
827
      if (scheduledHedging != null) {
1✔
828
        futureToBeCancelled = scheduledHedging.markCancelled();
1✔
829
        scheduledHedging = null;
1✔
830
      }
831
      state = state.freezeHedging();
1✔
832
    }
1✔
833

834
    if (futureToBeCancelled != null) {
1✔
835
      futureToBeCancelled.cancel(false);
1✔
836
    }
837
  }
1✔
838

839
  private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
840
    savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
1✔
841
        metadata);
842
    if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
1✔
843
      listenerSerializeExecutor.execute(
1✔
844
          new Runnable() {
1✔
845
            @Override
846
            public void run() {
847
              isClosed = true;
1✔
848
              masterListener.closed(status, progress, metadata);
1✔
849
            }
1✔
850
          });
851
    }
852
  }
1✔
853

854
  private static final boolean isExperimentalRetryJitterEnabled = GrpcUtil
1✔
855
          .getFlag("GRPC_EXPERIMENTAL_XDS_RLS_LB", true);
1✔
856

857
  public static long intervalWithJitter(long intervalNanos) {
858
    double inverseJitterFactor = isExperimentalRetryJitterEnabled
1✔
859
            ? 0.4 * random.nextDouble() + 0.8 : random.nextDouble();
1✔
860
    return (long) (intervalNanos * inverseJitterFactor);
1✔
861
  }
862

863
  private static final class SavedCloseMasterListenerReason {
864
    private final Status status;
865
    private final RpcProgress progress;
866
    private final Metadata metadata;
867

868
    SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
1✔
869
      this.status = status;
1✔
870
      this.progress = progress;
1✔
871
      this.metadata = metadata;
1✔
872
    }
1✔
873
  }
874

875
  private interface BufferEntry {
876
    /** Replays the buffer entry with the given stream. */
877
    void runWith(Substream substream);
878
  }
879

880
  private final class Sublistener implements ClientStreamListener {
1✔
881
    final Substream substream;
882

883
    Sublistener(Substream substream) {
1✔
884
      this.substream = substream;
1✔
885
    }
1✔
886

887
    @Override
888
    public void headersRead(final Metadata headers) {
889
      if (substream.previousAttemptCount > 0) {
1✔
890
        headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
1✔
891
        headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
1✔
892
      }
893
      commitAndRun(substream);
1✔
894
      if (state.winningSubstream == substream) {
1✔
895
        if (throttle != null) {
1✔
896
          throttle.onSuccess();
1✔
897
        }
898
        listenerSerializeExecutor.execute(
1✔
899
            new Runnable() {
1✔
900
              @Override
901
              public void run() {
902
                masterListener.headersRead(headers);
1✔
903
              }
1✔
904
            });
905
      }
906
    }
1✔
907

908
    @Override
909
    public void closed(
910
        final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
911
      synchronized (lock) {
1✔
912
        state = state.substreamClosed(substream);
1✔
913
        closedSubstreamsInsight.append(status.getCode());
1✔
914
      }
1✔
915

916
      if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
917
        assert savedCloseMasterListenerReason != null;
1✔
918
        listenerSerializeExecutor.execute(
1✔
919
            new Runnable() {
1✔
920
              @Override
921
              public void run() {
922
                isClosed = true;
1✔
923
                masterListener.closed(savedCloseMasterListenerReason.status,
1✔
924
                    savedCloseMasterListenerReason.progress,
1✔
925
                    savedCloseMasterListenerReason.metadata);
1✔
926
              }
1✔
927
            });
928
        return;
1✔
929
      }
930

931
      // handle a race between buffer limit exceeded and closed, when setting
932
      // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
933
      if (substream.bufferLimitExceeded) {
1✔
934
        commitAndRun(substream);
1✔
935
        if (state.winningSubstream == substream) {
1✔
936
          safeCloseMasterListener(status, rpcProgress, trailers);
1✔
937
        }
938
        return;
1✔
939
      }
940
      if (rpcProgress == RpcProgress.MISCARRIED
1✔
941
          && localOnlyTransparentRetries.incrementAndGet() > 1_000) {
1✔
942
        commitAndRun(substream);
1✔
943
        if (state.winningSubstream == substream) {
1✔
944
          Status tooManyTransparentRetries = GrpcUtil.statusWithDetails(
1✔
945
              Status.Code.INTERNAL, "Too many transparent retries. Might be a bug in gRPC", status);
946
          safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
1✔
947
        }
948
        return;
1✔
949
      }
950

951
      if (state.winningSubstream == null) {
1✔
952
        if (rpcProgress == RpcProgress.MISCARRIED
1✔
953
            || (rpcProgress == RpcProgress.REFUSED
954
                && noMoreTransparentRetry.compareAndSet(false, true))) {
1✔
955
          // transparent retry
956
          final Substream newSubstream = createSubstream(substream.previousAttemptCount,
1✔
957
              true, false);
958
          if (newSubstream == null) {
1✔
959
            return;
×
960
          }
961
          if (isHedging) {
1✔
962
            synchronized (lock) {
1✔
963
              // Although this operation is not done atomically with
964
              // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of
965
              // activeHedges, so neither does it affect the commitment decision of other threads,
966
              // nor do the commitment decision making threads affect itself.
967
              state = state.replaceActiveHedge(substream, newSubstream);
1✔
968
            }
1✔
969
          }
970

971
          callExecutor.execute(new Runnable() {
1✔
972
            @Override
973
            public void run() {
974
              drain(newSubstream);
1✔
975
            }
1✔
976
          });
977
          return;
1✔
978
        } else if (rpcProgress == RpcProgress.DROPPED) {
1✔
979
          // For normal retry, nothing need be done here, will just commit.
980
          // For hedging, cancel scheduled hedge that is scheduled prior to the drop
981
          if (isHedging) {
1✔
982
            freezeHedging();
×
983
          }
984
        } else {
985
          noMoreTransparentRetry.set(true);
1✔
986

987
          if (isHedging) {
1✔
988
            HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
1✔
989
            if (hedgingPlan.isHedgeable) {
1✔
990
              pushbackHedging(hedgingPlan.hedgingPushbackMillis);
1✔
991
            }
992
            synchronized (lock) {
1✔
993
              state = state.removeActiveHedge(substream);
1✔
994
              // The invariant is whether or not #(Potential Hedge + active hedges) > 0.
995
              // Once hasPotentialHedging(state) is false, it will always be false, and then
996
              // #(state.activeHedges) will be decreasing. This guarantees that even there may be
997
              // multiple concurrent hedges, one of the hedges will end up committed.
998
              if (hedgingPlan.isHedgeable) {
1✔
999
                if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
1✔
1000
                  return;
1✔
1001
                }
1002
                // else, no activeHedges, no new hedges possible, try to commit
1003
              } // else, isHedgeable is false, try to commit
1004
            }
1✔
1005
          } else {
1✔
1006
            RetryPlan retryPlan = makeRetryDecision(status, trailers);
1✔
1007
            if (retryPlan.shouldRetry) {
1✔
1008
              // retry
1009
              Substream newSubstream = createSubstream(substream.previousAttemptCount + 1,
1✔
1010
                  false, false);
1011
              if (newSubstream == null) {
1✔
1012
                return;
×
1013
              }
1014
              // The check state.winningSubstream == null, checking if is not already committed, is
1015
              // racy, but is still safe b/c the retry will also handle committed/cancellation
1016
              FutureCanceller scheduledRetryCopy;
1017
              synchronized (lock) {
1✔
1018
                scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
1✔
1019
              }
1✔
1020

1021
              class RetryBackoffRunnable implements Runnable {
1✔
1022
                @Override
1023
                @SuppressWarnings("FutureReturnValueIgnored")
1024
                public void run() {
1025
                  synchronized (scheduledRetryCopy.lock) {
1✔
1026
                    if (scheduledRetryCopy.isCancelled()) {
1✔
1027
                      return;
×
1028
                    } else {
1029
                      scheduledRetryCopy.markCancelled();
1✔
1030
                    }
1031
                  }
1✔
1032

1033
                  callExecutor.execute(
1✔
1034
                      new Runnable() {
1✔
1035
                        @Override
1036
                        public void run() {
1037
                          drain(newSubstream);
1✔
1038
                        }
1✔
1039
                      });
1040
                }
1✔
1041
              }
1042

1043
              scheduledRetryCopy.setFuture(
1✔
1044
                  scheduledExecutorService.schedule(
1✔
1045
                      new RetryBackoffRunnable(),
1046
                      retryPlan.backoffNanos,
1047
                      TimeUnit.NANOSECONDS));
1048
              return;
1✔
1049
            }
1050
          }
1051
        }
1052
      }
1053

1054
      commitAndRun(substream);
1✔
1055
      if (state.winningSubstream == substream) {
1✔
1056
        safeCloseMasterListener(status, rpcProgress, trailers);
1✔
1057
      }
1058
    }
1✔
1059

1060
    /**
1061
     * Decides in current situation whether or not the RPC should retry and if it should retry how
1062
     * long the backoff should be. The decision does not take the commitment status into account, so
1063
     * caller should check it separately. It also updates the throttle. It does not change state.
1064
     */
1065
    private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
1066
      if (retryPolicy == null) {
1✔
1067
        return new RetryPlan(false, 0);
1✔
1068
      }
1069
      boolean shouldRetry = false;
1✔
1070
      long backoffNanos = 0L;
1✔
1071
      boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
1✔
1072
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1073
      boolean isThrottled = false;
1✔
1074
      if (throttle != null) {
1✔
1075
        if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1076
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1077
        }
1078
      }
1079

1080
      if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
1✔
1081
        if (pushbackMillis == null) {
1✔
1082
          if (isRetryableStatusCode) {
1✔
1083
            shouldRetry = true;
1✔
1084
            backoffNanos = intervalWithJitter(nextBackoffIntervalNanos);
1✔
1085
            nextBackoffIntervalNanos = Math.min(
1✔
1086
                (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
1✔
1087
                retryPolicy.maxBackoffNanos);
1✔
1088
          } // else no retry
1089
        } else if (pushbackMillis >= 0) {
1✔
1090
          shouldRetry = true;
1✔
1091
          backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
1✔
1092
          nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
1093
        } // else no retry
1094
      } // else no retry
1095

1096
      return new RetryPlan(shouldRetry, backoffNanos);
1✔
1097
    }
1098

1099
    private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
1100
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1101
      boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
1✔
1102
      boolean isThrottled = false;
1✔
1103
      if (throttle != null) {
1✔
1104
        if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1105
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1106
        }
1107
      }
1108
      if (!isFatal && !isThrottled && !status.isOk()
1✔
1109
          && (pushbackMillis != null && pushbackMillis > 0)) {
1✔
1110
        pushbackMillis = 0; // We want the retry after a nonfatal error to be immediate
1✔
1111
      }
1112
      return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
1✔
1113
    }
1114

1115
    @Nullable
1116
    private Integer getPushbackMills(Metadata trailer) {
1117
      String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
1✔
1118
      Integer pushbackMillis = null;
1✔
1119
      if (pushbackStr != null) {
1✔
1120
        try {
1121
          pushbackMillis = Integer.valueOf(pushbackStr);
1✔
1122
        } catch (NumberFormatException e) {
1✔
1123
          pushbackMillis = -1;
1✔
1124
        }
1✔
1125
      }
1126
      return pushbackMillis;
1✔
1127
    }
1128

1129
    @Override
1130
    public void messagesAvailable(final MessageProducer producer) {
1131
      State savedState = state;
1✔
1132
      checkState(
1✔
1133
          savedState.winningSubstream != null, "Headers should be received prior to messages.");
1134
      if (savedState.winningSubstream != substream) {
1✔
1135
        GrpcUtil.closeQuietly(producer);
1✔
1136
        return;
1✔
1137
      }
1138
      listenerSerializeExecutor.execute(
1✔
1139
          new Runnable() {
1✔
1140
            @Override
1141
            public void run() {
1142
              masterListener.messagesAvailable(producer);
1✔
1143
            }
1✔
1144
          });
1145
    }
1✔
1146

1147
    @Override
1148
    public void onReady() {
1149
      // FIXME(#7089): hedging case is broken.
1150
      if (!isReady()) {
1✔
1151
        return;
1✔
1152
      }
1153
      listenerSerializeExecutor.execute(
1✔
1154
          new Runnable() {
1✔
1155
            @Override
1156
            public void run() {
1157
              if (!isClosed) {
1✔
1158
                masterListener.onReady();
1✔
1159
              }
1160
            }
1✔
1161
          });
1162
    }
1✔
1163
  }
1164

1165
  private static final class State {
1166
    /** Committed and the winning substream drained. */
1167
    final boolean passThrough;
1168

1169
    /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
1170
    @Nullable final List<BufferEntry> buffer;
1171

1172
    /**
1173
     * Unmodifiable collection of all the open substreams that are drained. Singleton once
1174
     * passThrough; Empty if committed but not passTrough.
1175
     */
1176
    final Collection<Substream> drainedSubstreams;
1177

1178
    /**
1179
     * Unmodifiable collection of all the active hedging substreams.
1180
     *
1181
     * <p>A substream even with the attribute substream.closed being true may be considered still
1182
     * "active" at the moment as long as it is in this collection.
1183
     */
1184
    final Collection<Substream> activeHedges; // not null once isHedging = true
1185

1186
    final int hedgingAttemptCount;
1187

1188
    /** Null until committed. */
1189
    @Nullable final Substream winningSubstream;
1190

1191
    /** Not required to set to true when cancelled, but can short-circuit the draining process. */
1192
    final boolean cancelled;
1193

1194
    /** No more hedging due to events like drop or pushback. */
1195
    final boolean hedgingFrozen;
1196

1197
    State(
1198
        @Nullable List<BufferEntry> buffer,
1199
        Collection<Substream> drainedSubstreams,
1200
        Collection<Substream> activeHedges,
1201
        @Nullable Substream winningSubstream,
1202
        boolean cancelled,
1203
        boolean passThrough,
1204
        boolean hedgingFrozen,
1205
        int hedgingAttemptCount) {
1✔
1206
      this.buffer = buffer;
1✔
1207
      this.drainedSubstreams =
1✔
1208
          checkNotNull(drainedSubstreams, "drainedSubstreams");
1✔
1209
      this.winningSubstream = winningSubstream;
1✔
1210
      this.activeHedges = activeHedges;
1✔
1211
      this.cancelled = cancelled;
1✔
1212
      this.passThrough = passThrough;
1✔
1213
      this.hedgingFrozen = hedgingFrozen;
1✔
1214
      this.hedgingAttemptCount = hedgingAttemptCount;
1✔
1215

1216
      checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
1✔
1217
      checkState(
1✔
1218
          !passThrough || winningSubstream != null,
1219
          "passThrough should imply winningSubstream != null");
1220
      checkState(
1✔
1221
          !passThrough
1222
              || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
1✔
1223
              || (drainedSubstreams.size() == 0 && winningSubstream.closed),
1✔
1224
          "passThrough should imply winningSubstream is drained");
1225
      checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
1✔
1226
    }
1✔
1227

1228
    @CheckReturnValue
1229
    // GuardedBy RetriableStream.lock
1230
    State cancelled() {
1231
      return new State(
1✔
1232
          buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough,
1233
          hedgingFrozen, hedgingAttemptCount);
1234
    }
1235

1236
    /** The given substream is drained. */
1237
    @CheckReturnValue
1238
    // GuardedBy RetriableStream.lock
1239
    State substreamDrained(Substream substream) {
1240
      checkState(!passThrough, "Already passThrough");
1✔
1241

1242
      Collection<Substream> drainedSubstreams;
1243
      
1244
      if (substream.closed) {
1✔
1245
        drainedSubstreams = this.drainedSubstreams;
1✔
1246
      } else if (this.drainedSubstreams.isEmpty()) {
1✔
1247
        // optimize for 0-retry, which is most of the cases.
1248
        drainedSubstreams = Collections.singletonList(substream);
1✔
1249
      } else {
1250
        drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1251
        drainedSubstreams.add(substream);
1✔
1252
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1253
      }
1254

1255
      boolean passThrough = winningSubstream != null;
1✔
1256

1257
      List<BufferEntry> buffer = this.buffer;
1✔
1258
      if (passThrough) {
1✔
1259
        checkState(
1✔
1260
            winningSubstream == substream, "Another RPC attempt has already committed");
1261
        buffer = null;
1✔
1262
      }
1263

1264
      return new State(
1✔
1265
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1266
          hedgingFrozen, hedgingAttemptCount);
1267
    }
1268

1269
    /** The given substream is closed. */
1270
    @CheckReturnValue
1271
    // GuardedBy RetriableStream.lock
1272
    State substreamClosed(Substream substream) {
1273
      substream.closed = true;
1✔
1274
      if (this.drainedSubstreams.contains(substream)) {
1✔
1275
        Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1276
        drainedSubstreams.remove(substream);
1✔
1277
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1278
        return new State(
1✔
1279
            buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1280
            hedgingFrozen, hedgingAttemptCount);
1281
      } else {
1282
        return this;
1✔
1283
      }
1284
    }
1285

1286
    @CheckReturnValue
1287
    // GuardedBy RetriableStream.lock
1288
    State committed(Substream winningSubstream) {
1289
      checkState(this.winningSubstream == null, "Already committed");
1✔
1290

1291
      boolean passThrough = false;
1✔
1292
      List<BufferEntry> buffer = this.buffer;
1✔
1293
      Collection<Substream> drainedSubstreams;
1294

1295
      if (this.drainedSubstreams.contains(winningSubstream)) {
1✔
1296
        passThrough = true;
1✔
1297
        buffer = null;
1✔
1298
        drainedSubstreams = Collections.singleton(winningSubstream);
1✔
1299
      } else {
1300
        drainedSubstreams = Collections.emptyList();
1✔
1301
      }
1302

1303
      return new State(
1✔
1304
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1305
          hedgingFrozen, hedgingAttemptCount);
1306
    }
1307

1308
    @CheckReturnValue
1309
    // GuardedBy RetriableStream.lock
1310
    State freezeHedging() {
1311
      if (hedgingFrozen) {
1✔
1312
        return this;
×
1313
      }
1314
      return new State(
1✔
1315
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1316
          true, hedgingAttemptCount);
1317
    }
1318

1319
    @CheckReturnValue
1320
    // GuardedBy RetriableStream.lock
1321
    // state.hedgingAttemptCount is modified only here.
1322
    // The method is only called in RetriableStream.start() and HedgingRunnable.run()
1323
    State addActiveHedge(Substream substream) {
1324
      // hasPotentialHedging must be true
1325
      checkState(!hedgingFrozen, "hedging frozen");
1✔
1326
      checkState(winningSubstream == null, "already committed");
1✔
1327

1328
      Collection<Substream> activeHedges;
1329
      if (this.activeHedges == null) {
1✔
1330
        activeHedges = Collections.singleton(substream);
1✔
1331
      } else {
1332
        activeHedges = new ArrayList<>(this.activeHedges);
1✔
1333
        activeHedges.add(substream);
1✔
1334
        activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1335
      }
1336

1337
      int hedgingAttemptCount = this.hedgingAttemptCount + 1;
1✔
1338
      return new State(
1✔
1339
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1340
          hedgingFrozen, hedgingAttemptCount);
1341
    }
1342

1343
    @CheckReturnValue
1344
    // GuardedBy RetriableStream.lock
1345
    // The method is only called in Sublistener.closed()
1346
    State removeActiveHedge(Substream substream) {
1347
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1348
      activeHedges.remove(substream);
1✔
1349
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1350

1351
      return new State(
1✔
1352
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1353
          hedgingFrozen, hedgingAttemptCount);
1354
    }
1355

1356
    @CheckReturnValue
1357
    // GuardedBy RetriableStream.lock
1358
    // The method is only called for transparent retry.
1359
    State replaceActiveHedge(Substream oldOne, Substream newOne) {
1360
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1361
      activeHedges.remove(oldOne);
1✔
1362
      activeHedges.add(newOne);
1✔
1363
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1364

1365
      return new State(
1✔
1366
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1367
          hedgingFrozen, hedgingAttemptCount);
1368
    }
1369
  }
1370

1371
  /**
1372
   * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
1373
   *  attributes.
1374
   */
1375
  private static final class Substream {
1376
    ClientStream stream;
1377

1378
    // GuardedBy RetriableStream.lock
1379
    boolean closed;
1380

1381
    // setting to true must be GuardedBy RetriableStream.lock
1382
    boolean bufferLimitExceeded;
1383

1384
    final int previousAttemptCount;
1385

1386
    Substream(int previousAttemptCount) {
1✔
1387
      this.previousAttemptCount = previousAttemptCount;
1✔
1388
    }
1✔
1389
  }
1390

1391

1392
  /**
1393
   * Traces the buffer used by a substream.
1394
   */
1395
  class BufferSizeTracer extends ClientStreamTracer {
1396
    // Each buffer size tracer is dedicated to one specific substream.
1397
    private final Substream substream;
1398

1399
    @GuardedBy("lock")
1400
    long bufferNeeded;
1401

1402
    BufferSizeTracer(Substream substream) {
1✔
1403
      this.substream = substream;
1✔
1404
    }
1✔
1405

1406
    /**
1407
     * A message is sent to the wire, so its reference would be released if no retry or
1408
     * hedging were involved. So at this point we have to hold the reference of the message longer
1409
     * for retry, and we need to increment {@code substream.bufferNeeded}.
1410
     */
1411
    @Override
1412
    public void outboundWireSize(long bytes) {
1413
      if (state.winningSubstream != null) {
1✔
1414
        return;
1✔
1415
      }
1416

1417
      Runnable postCommitTask = null;
1✔
1418

1419
      // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
1420
      synchronized (lock) {
1✔
1421
        if (state.winningSubstream != null || substream.closed) {
1✔
1422
          return;
×
1423
        }
1424
        bufferNeeded += bytes;
1✔
1425
        if (bufferNeeded <= perRpcBufferUsed) {
1✔
1426
          return;
1✔
1427
        }
1428

1429
        if (bufferNeeded > perRpcBufferLimit) {
1✔
1430
          substream.bufferLimitExceeded = true;
1✔
1431
        } else {
1432
          // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
1433
          long savedChannelBufferUsed =
1✔
1434
              channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
1✔
1435
          perRpcBufferUsed = bufferNeeded;
1✔
1436

1437
          if (savedChannelBufferUsed > channelBufferLimit) {
1✔
1438
            substream.bufferLimitExceeded = true;
1✔
1439
          }
1440
        }
1441

1442
        if (substream.bufferLimitExceeded) {
1✔
1443
          postCommitTask = commit(substream);
1✔
1444
        }
1445
      }
1✔
1446

1447
      if (postCommitTask != null) {
1✔
1448
        postCommitTask.run();
1✔
1449
      }
1450
    }
1✔
1451
  }
1452

1453
  /**
1454
   *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
1455
   *  the Channel. There should be a single instance of it for each channel.
1456
   */
1457
  static final class ChannelBufferMeter {
1✔
1458
    private final AtomicLong bufferUsed = new AtomicLong();
1✔
1459

1460
    @VisibleForTesting
1461
    long addAndGet(long newBytesUsed) {
1462
      return bufferUsed.addAndGet(newBytesUsed);
1✔
1463
    }
1464
  }
1465

1466
  /**
1467
   * Used for retry throttling.
1468
   */
1469
  static final class Throttle {
1470

1471
    private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
1472

1473
    /**
1474
     * 1000 times the maxTokens field of the retryThrottling policy in service config.
1475
     * The number of tokens starts at maxTokens. The token_count will always be between 0 and
1476
     * maxTokens.
1477
     */
1478
    final int maxTokens;
1479

1480
    /**
1481
     * Half of {@code maxTokens}.
1482
     */
1483
    final int threshold;
1484

1485
    /**
1486
     * 1000 times the tokenRatio field of the retryThrottling policy in service config.
1487
     */
1488
    final int tokenRatio;
1489

1490
    final AtomicInteger tokenCount = new AtomicInteger();
1✔
1491

1492
    Throttle(float maxTokens, float tokenRatio) {
1✔
1493
      // tokenRatio is up to 3 decimal places
1494
      this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1495
      this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1496
      this.threshold = this.maxTokens / 2;
1✔
1497
      tokenCount.set(this.maxTokens);
1✔
1498
    }
1✔
1499

1500
    @VisibleForTesting
1501
    boolean isAboveThreshold() {
1502
      return tokenCount.get() > threshold;
1✔
1503
    }
1504

1505
    /**
1506
     * Counts down the token on qualified failure and checks if it is above the threshold
1507
     * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
1508
     * a not-to-retry pushback.
1509
     */
1510
    @VisibleForTesting
1511
    boolean onQualifiedFailureThenCheckIsAboveThreshold() {
1512
      while (true) {
1513
        int currentCount = tokenCount.get();
1✔
1514
        if (currentCount == 0) {
1✔
1515
          return false;
1✔
1516
        }
1517
        int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1518
        boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
1✔
1519
        if (updated) {
1✔
1520
          return decremented > threshold;
1✔
1521
        }
1522
      }
×
1523
    }
1524

1525
    @VisibleForTesting
1526
    void onSuccess() {
1527
      while (true) {
1528
        int currentCount = tokenCount.get();
1✔
1529
        if (currentCount == maxTokens) {
1✔
1530
          break;
1✔
1531
        }
1532
        int incremented = currentCount + tokenRatio;
1✔
1533
        boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
1✔
1534
        if (updated) {
1✔
1535
          break;
1✔
1536
        }
1537
      }
×
1538
    }
1✔
1539

1540
    @Override
1541
    public boolean equals(Object o) {
1542
      if (this == o) {
1✔
1543
        return true;
×
1544
      }
1545
      if (!(o instanceof Throttle)) {
1✔
1546
        return false;
×
1547
      }
1548
      Throttle that = (Throttle) o;
1✔
1549
      return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
1✔
1550
    }
1551

1552
    @Override
1553
    public int hashCode() {
1554
      return Objects.hashCode(maxTokens, tokenRatio);
×
1555
    }
1556
  }
1557

1558
  private static final class RetryPlan {
1559
    final boolean shouldRetry;
1560
    final long backoffNanos;
1561

1562
    RetryPlan(boolean shouldRetry, long backoffNanos) {
1✔
1563
      this.shouldRetry = shouldRetry;
1✔
1564
      this.backoffNanos = backoffNanos;
1✔
1565
    }
1✔
1566
  }
1567

1568
  private static final class HedgingPlan {
1569
    final boolean isHedgeable;
1570
    @Nullable
1571
    final Integer hedgingPushbackMillis;
1572

1573
    public HedgingPlan(
1574
        boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
1✔
1575
      this.isHedgeable = isHedgeable;
1✔
1576
      this.hedgingPushbackMillis = hedgingPushbackMillis;
1✔
1577
    }
1✔
1578
  }
1579

1580
  /** Allows cancelling a Future without racing with setting the future. */
1581
  private static final class FutureCanceller {
1582

1583
    final Object lock;
1584
    @GuardedBy("lock")
1585
    Future<?> future;
1586
    @GuardedBy("lock")
1587
    boolean cancelled;
1588

1589
    FutureCanceller(Object lock) {
1✔
1590
      this.lock = lock;
1✔
1591
    }
1✔
1592

1593
    void setFuture(Future<?> future) {
1594
      boolean wasCancelled;
1595
      synchronized (lock) {
1✔
1596
        wasCancelled = cancelled;
1✔
1597
        if (!wasCancelled) {
1✔
1598
          this.future = future;
1✔
1599
        }
1600
      }
1✔
1601
      if (wasCancelled) {
1✔
1602
        future.cancel(false);
×
1603
      }
1604
    }
1✔
1605

1606
    @GuardedBy("lock")
1607
    @CheckForNull // Must cancel the returned future if not null.
1608
    Future<?> markCancelled() {
1609
      cancelled = true;
1✔
1610
      return future;
1✔
1611
    }
1612

1613
    @GuardedBy("lock")
1614
    boolean isCancelled() {
1615
      return cancelled;
1✔
1616
    }
1617
  }
1618
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc