• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20172

11 Feb 2026 12:26PM UTC coverage: 88.71% (+0.003%) from 88.707%
#20172

push

github

jdcormie
api: Use io.grpc.Uri for target parsing in ManagedChannelRegistry

35405 of 39911 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.07
/../core/src/main/java/io/grpc/internal/RetriableStream.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Objects;
25
import com.google.errorprone.annotations.CheckReturnValue;
26
import com.google.errorprone.annotations.concurrent.GuardedBy;
27
import io.grpc.Attributes;
28
import io.grpc.ClientStreamTracer;
29
import io.grpc.Compressor;
30
import io.grpc.Deadline;
31
import io.grpc.DecompressorRegistry;
32
import io.grpc.Metadata;
33
import io.grpc.MethodDescriptor;
34
import io.grpc.Status;
35
import io.grpc.SynchronizationContext;
36
import io.grpc.internal.ClientStreamListener.RpcProgress;
37
import java.io.InputStream;
38
import java.lang.Thread.UncaughtExceptionHandler;
39
import java.util.ArrayList;
40
import java.util.Collection;
41
import java.util.Collections;
42
import java.util.List;
43
import java.util.Random;
44
import java.util.concurrent.Executor;
45
import java.util.concurrent.Future;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import java.util.concurrent.atomic.AtomicBoolean;
49
import java.util.concurrent.atomic.AtomicInteger;
50
import java.util.concurrent.atomic.AtomicLong;
51
import javax.annotation.CheckForNull;
52
import javax.annotation.Nullable;
53

54
/** A logical {@link ClientStream} that is retriable. */
55
abstract class RetriableStream<ReqT> implements ClientStream {
56
  @VisibleForTesting
57
  static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
1✔
58
      Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
1✔
59

60
  @VisibleForTesting
61
  static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
1✔
62
      Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
1✔
63

64
  private static final Status CANCELLED_BECAUSE_COMMITTED =
1✔
65
      Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
1✔
66

67
  private final MethodDescriptor<ReqT, ?> method;
68
  private final Executor callExecutor;
69
  private final Executor listenerSerializeExecutor = new SynchronizationContext(
1✔
70
      new UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          throw Status.fromThrowable(e)
×
74
              .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
×
75
              .asRuntimeException();
×
76
        }
77
      }
78
  );
79
  private final ScheduledExecutorService scheduledExecutorService;
80
  // Must not modify it.
81
  private final Metadata headers;
82
  @Nullable
83
  private final RetryPolicy retryPolicy;
84
  @Nullable
85
  private final HedgingPolicy hedgingPolicy;
86
  private final boolean isHedging;
87

88
  /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
89
  private final Object lock = new Object();
1✔
90

91
  private final ChannelBufferMeter channelBufferUsed;
92
  private final long perRpcBufferLimit;
93
  private final long channelBufferLimit;
94
  @Nullable
95
  private final Throttle throttle;
96
  @GuardedBy("lock")
1✔
97
  private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
98

99
  private volatile State state = new State(
1✔
100
      new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
1✔
101
      false, 0);
102

103
  /**
104
   * Either non-local transparent retry happened or reached server's application logic.
105
   *
106
   * <p>Note that local-only transparent retries are unlimited.
107
   */
108
  private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
1✔
109
  private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
1✔
110
  private final AtomicInteger inFlightSubStreams = new AtomicInteger();
1✔
111
  private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112

113
  // Used for recording the share of buffer used for the current call out of the channel buffer.
114
  // This field would not be necessary if there is no channel buffer limit.
115
  @GuardedBy("lock")
116
  private long perRpcBufferUsed;
117

118
  private ClientStreamListener masterListener;
119
  @GuardedBy("lock")
120
  private FutureCanceller scheduledRetry;
121
  @GuardedBy("lock")
122
  private FutureCanceller scheduledHedging;
123
  private long nextBackoffIntervalNanos;
124
  private Status cancellationStatus;
125
  private boolean isClosed;
126

127
  RetriableStream(
128
      MethodDescriptor<ReqT, ?> method, Metadata headers,
129
      ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
130
      Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
131
      @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy,
132
      @Nullable Throttle throttle) {
1✔
133
    this.method = method;
1✔
134
    this.channelBufferUsed = channelBufferUsed;
1✔
135
    this.perRpcBufferLimit = perRpcBufferLimit;
1✔
136
    this.channelBufferLimit = channelBufferLimit;
1✔
137
    this.callExecutor = callExecutor;
1✔
138
    this.scheduledExecutorService = scheduledExecutorService;
1✔
139
    this.headers = headers;
1✔
140
    this.retryPolicy = retryPolicy;
1✔
141
    if (retryPolicy != null) {
1✔
142
      this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
143
    }
144
    this.hedgingPolicy = hedgingPolicy;
1✔
145
    checkArgument(
1✔
146
        retryPolicy == null || hedgingPolicy == null,
147
        "Should not provide both retryPolicy and hedgingPolicy");
148
    this.isHedging = hedgingPolicy != null;
1✔
149
    this.throttle = throttle;
1✔
150
  }
1✔
151

152
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153
  @Nullable // null if already committed
154
  @CheckReturnValue
155
  private Runnable commit(final Substream winningSubstream) {
156
    synchronized (lock) {
1✔
157
      if (state.winningSubstream != null) {
1✔
158
        return null;
1✔
159
      }
160
      final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
1✔
161

162
      state = state.committed(winningSubstream);
1✔
163

164
      // subtract the share of this RPC from channelBufferUsed.
165
      channelBufferUsed.addAndGet(-perRpcBufferUsed);
1✔
166

167
      final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
1✔
168
      final Future<?> retryFuture;
169
      final boolean retryWasScheduled = scheduledRetry != null;
1✔
170
      if (retryWasScheduled) {
1✔
171
        retryFuture = scheduledRetry.markCancelled();
1✔
172
        scheduledRetry = null;
1✔
173
      } else {
174
        retryFuture = null;
1✔
175
      }
176
      // cancel the scheduled hedging if it is scheduled prior to the commitment
177
      final Future<?> hedgingFuture;
178
      if (scheduledHedging != null) {
1✔
179
        hedgingFuture = scheduledHedging.markCancelled();
1✔
180
        scheduledHedging = null;
1✔
181
      } else {
182
        hedgingFuture = null;
1✔
183
      }
184

185
      class CommitTask implements Runnable {
1✔
186
        @Override
187
        public void run() {
188
          // For hedging only, not needed for normal retry
189
          for (Substream substream : savedDrainedSubstreams) {
1✔
190
            if (substream != winningSubstream) {
1✔
191
              substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
1✔
192
            }
193
          }
1✔
194
          if (retryWasScheduled) {
1✔
195
            if (retryFuture != null) {
1✔
196
              retryFuture.cancel(false);
1✔
197
            }
198
            if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
199
              assert savedCloseMasterListenerReason != null;
×
200
              listenerSerializeExecutor.execute(
×
201
                  new Runnable() {
×
202
                    @Override
203
                    public void run() {
204
                      isClosed = true;
×
205
                      masterListener.closed(savedCloseMasterListenerReason.status,
×
206
                          savedCloseMasterListenerReason.progress,
×
207
                          savedCloseMasterListenerReason.metadata);
×
208
                    }
×
209
                  });
210
            }
211
          }
212

213
          if (hedgingFuture != null) {
1✔
214
            hedgingFuture.cancel(false);
1✔
215
          }
216

217
          postCommit();
1✔
218
        }
1✔
219
      }
220

221
      return new CommitTask();
1✔
222
    }
223
  }
224

225
  abstract void postCommit();
226

227
  /**
228
   * Calls commit() and if successful runs the post commit task. Post commit task will be non-null
229
   * for only once. The post commit task cancels other non-winning streams on separate transport
230
   * threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream
231
   * transports.(issues/10314)
232
   * This method should be called only in subListener callbacks. This guarantees callExecutor
233
   * schedules tasks before master listener closes, which is protected by the inFlightSubStreams
234
   * decorative. That is because:
235
   * For a successful winning stream, other streams won't attempt to close master listener.
236
   * For a cancelled winning stream (noop), other stream won't attempt to close master listener.
237
   * For a failed/closed winning stream, the last closed stream closes the master listener, and
238
   * callExecutor scheduling happens-before that.
239
   */
240
  private void commitAndRun(Substream winningSubstream) {
241
    Runnable postCommitTask = commit(winningSubstream);
1✔
242

243
    if (postCommitTask != null) {
1✔
244
      callExecutor.execute(postCommitTask);
1✔
245
    }
246
  }
1✔
247

248
  // returns null means we should not create new sub streams, e.g. cancelled or
249
  // other close condition is met for retriableStream.
250
  @Nullable
251
  private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry,
252
                                    boolean isHedgedStream) {
253
    int inFlight;
254
    do {
255
      inFlight = inFlightSubStreams.get();
1✔
256
      if (inFlight < 0) {
1✔
257
        return null;
×
258
      }
259
    } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1));
1✔
260
    Substream sub = new Substream(previousAttemptCount);
1✔
261
    // one tracer per substream
262
    final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
1✔
263
    ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
1✔
264
      @Override
265
      public ClientStreamTracer newClientStreamTracer(
266
          ClientStreamTracer.StreamInfo info, Metadata headers) {
267
        return bufferSizeTracer;
1✔
268
      }
269
    };
270

271
    Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
1✔
272
    // NOTICE: This set _must_ be done before stream.start() and it actually is.
273
    sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry,
1✔
274
        isHedgedStream);
275
    return sub;
1✔
276
  }
277

278
  /**
279
   * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
280
   * Client stream is not yet started.
281
   */
282
  abstract ClientStream newSubstream(
283
      Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
284
      boolean isTransparentRetry, boolean isHedgedStream);
285

286
  /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
287
  @VisibleForTesting
288
  final Metadata updateHeaders(
289
      Metadata originalHeaders, int previousAttemptCount) {
290
    Metadata newHeaders = new Metadata();
1✔
291
    newHeaders.merge(originalHeaders);
1✔
292
    if (previousAttemptCount > 0) {
1✔
293
      newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount));
1✔
294
    }
295
    return newHeaders;
1✔
296
  }
297

298
  private void drain(Substream substream) {
299
    int index = 0;
1✔
300
    int chunk = 0x80;
1✔
301
    List<BufferEntry> list = null;
1✔
302
    boolean streamStarted = false;
1✔
303
    Runnable onReadyRunnable = null;
1✔
304

305
    while (true) {
306
      State savedState;
307

308
      synchronized (lock) {
1✔
309
        savedState = state;
1✔
310
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
311
          // committed but not me, to be cancelled
312
          break;
1✔
313
        }
314
        if (savedState.cancelled) {
1✔
315
          break;
1✔
316
        }
317
        if (index == savedState.buffer.size()) { // I'm drained
1✔
318
          state = savedState.substreamDrained(substream);
1✔
319
          if (!isReady()) {
1✔
320
            return;
1✔
321
          }
322
          onReadyRunnable = new Runnable() {
1✔
323
            @Override
324
            public void run() {
325
              if (!isClosed) {
1✔
326
                masterListener.onReady();
1✔
327
              }
328
            }
1✔
329
          };
330
          break;
1✔
331
        }
332

333
        if (substream.closed) {
1✔
334
          return;
×
335
        }
336

337
        int stop = Math.min(index + chunk, savedState.buffer.size());
1✔
338
        if (list == null) {
1✔
339
          list = new ArrayList<>(savedState.buffer.subList(index, stop));
1✔
340
        } else {
341
          list.clear();
1✔
342
          list.addAll(savedState.buffer.subList(index, stop));
1✔
343
        }
344
        index = stop;
1✔
345
      }
1✔
346

347
      for (BufferEntry bufferEntry : list) {
1✔
348
        bufferEntry.runWith(substream);
1✔
349
        if (bufferEntry instanceof RetriableStream.StartEntry) {
1✔
350
          streamStarted = true;
1✔
351
        }
352
        savedState = state;
1✔
353
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
354
          // committed but not me, to be cancelled
355
          break;
1✔
356
        }
357
        if (savedState.cancelled) {
1✔
358
          break;
1✔
359
        }
360
      }
1✔
361
    }
1✔
362

363
    if (onReadyRunnable != null) {
1✔
364
      listenerSerializeExecutor.execute(onReadyRunnable);
1✔
365
      return;
1✔
366
    }
367

368
    if (!streamStarted) {
1✔
369
      // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
370
      substream.stream.start(new Sublistener(substream));
1✔
371
    }
372
    substream.stream.cancel(
1✔
373
        state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
1✔
374
  }
1✔
375

376
  /**
377
   * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
378
   */
379
  @CheckReturnValue
380
  @Nullable
381
  abstract Status prestart();
382

383
  class StartEntry implements BufferEntry {
1✔
384
    @Override
385
    public void runWith(Substream substream) {
386
      substream.stream.start(new Sublistener(substream));
1✔
387
    }
1✔
388
  }
389

390
  /** Starts the first RPC attempt. */
391
  @Override
392
  public final void start(ClientStreamListener listener) {
393
    masterListener = listener;
1✔
394

395
    Status shutdownStatus = prestart();
1✔
396

397
    if (shutdownStatus != null) {
1✔
398
      cancel(shutdownStatus);
1✔
399
      return;
1✔
400
    }
401

402
    synchronized (lock) {
1✔
403
      state.buffer.add(new StartEntry());
1✔
404
    }
1✔
405

406
    Substream substream = createSubstream(0, false, false);
1✔
407
    if (substream == null) {
1✔
408
      return;
×
409
    }
410
    if (isHedging) {
1✔
411
      FutureCanceller scheduledHedgingRef = null;
1✔
412

413
      synchronized (lock) {
1✔
414
        state = state.addActiveHedge(substream);
1✔
415
        if (hasPotentialHedging(state)
1✔
416
            && (throttle == null || throttle.isAboveThreshold())) {
1✔
417
          scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
1✔
418
        }
419
      }
1✔
420

421
      if (scheduledHedgingRef != null) {
1✔
422
        scheduledHedgingRef.setFuture(
1✔
423
            scheduledExecutorService.schedule(
1✔
424
                new HedgingRunnable(scheduledHedgingRef),
425
                hedgingPolicy.hedgingDelayNanos,
426
                TimeUnit.NANOSECONDS));
427
      }
428
    }
429

430
    drain(substream);
1✔
431
  }
1✔
432

433
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
434
  private void pushbackHedging(@Nullable Integer delayMillis) {
435
    if (delayMillis == null) {
1✔
436
      return;
1✔
437
    }
438
    if (delayMillis < 0) {
1✔
439
      freezeHedging();
1✔
440
      return;
1✔
441
    }
442

443
    // Cancels the current scheduledHedging and reschedules a new one.
444
    FutureCanceller future;
445
    Future<?> futureToBeCancelled;
446

447
    synchronized (lock) {
1✔
448
      if (scheduledHedging == null) {
1✔
449
        return;
×
450
      }
451

452
      futureToBeCancelled = scheduledHedging.markCancelled();
1✔
453
      scheduledHedging = future = new FutureCanceller(lock);
1✔
454
    }
1✔
455

456
    if (futureToBeCancelled != null) {
1✔
457
      futureToBeCancelled.cancel(false);
1✔
458
    }
459
    future.setFuture(scheduledExecutorService.schedule(
1✔
460
        new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS));
1✔
461
  }
1✔
462

463
  private final class HedgingRunnable implements Runnable {
464

465
    // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
466
    // by a positive push-back just after newSubstream is instantiated, so that we can double check.
467
    final FutureCanceller scheduledHedgingRef;
468

469
    HedgingRunnable(FutureCanceller scheduledHedging) {
1✔
470
      scheduledHedgingRef = scheduledHedging;
1✔
471
    }
1✔
472

473
    @Override
474
    public void run() {
475
      // It's safe to read state.hedgingAttemptCount here.
476
      // If this run is not cancelled, the value of state.hedgingAttemptCount won't change
477
      // until state.addActiveHedge() is called subsequently, even the state could possibly
478
      // change.
479
      Substream newSubstream = createSubstream(state.hedgingAttemptCount, false, true);
1✔
480
      if (newSubstream == null) {
1✔
481
        return;
×
482
      }
483
      callExecutor.execute(
1✔
484
          new Runnable() {
1✔
485
            @SuppressWarnings("GuardedBy")  //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
486
            @Override
487
            public void run() {
488
              boolean cancelled = false;
1✔
489
              FutureCanceller future = null;
1✔
490

491
              synchronized (lock) {
1✔
492
                if (scheduledHedgingRef.isCancelled()) {
1✔
493
                  cancelled = true;
×
494
                } else {
495
                  state = state.addActiveHedge(newSubstream);
1✔
496
                  if (hasPotentialHedging(state)
1✔
497
                      && (throttle == null || throttle.isAboveThreshold())) {
1✔
498
                    scheduledHedging = future = new FutureCanceller(lock);
1✔
499
                  } else {
500
                    state = state.freezeHedging();
1✔
501
                    scheduledHedging = null;
1✔
502
                  }
503
                }
504
              }
1✔
505

506
              if (cancelled) {
1✔
507
                // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
508
                newSubstream.stream.start(new Sublistener(newSubstream));
×
509
                newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
×
510
                return;
×
511
              }
512
              if (future != null) {
1✔
513
                future.setFuture(
1✔
514
                    scheduledExecutorService.schedule(
1✔
515
                        new HedgingRunnable(future),
516
                        hedgingPolicy.hedgingDelayNanos,
1✔
517
                        TimeUnit.NANOSECONDS));
518
              }
519
              drain(newSubstream);
1✔
520
            }
1✔
521
          });
522
    }
1✔
523
  }
524

525
  @Override
526
  public final void cancel(final Status reason) {
527
    Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
1✔
528
    noopSubstream.stream = new NoopClientStream();
1✔
529
    Runnable runnable = commit(noopSubstream);
1✔
530

531
    if (runnable != null) {
1✔
532
      synchronized (lock) {
1✔
533
        state = state.substreamDrained(noopSubstream);
1✔
534
      }
1✔
535
      runnable.run();
1✔
536
      safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
1✔
537
      return;
1✔
538
    }
539

540
    Substream winningSubstreamToCancel = null;
1✔
541
    synchronized (lock) {
1✔
542
      if (state.drainedSubstreams.contains(state.winningSubstream)) {
1✔
543
        winningSubstreamToCancel = state.winningSubstream;
1✔
544
      } else { // the winningSubstream will be cancelled while draining
545
        cancellationStatus = reason;
1✔
546
      }
547
      state = state.cancelled();
1✔
548
    }
1✔
549
    if (winningSubstreamToCancel != null) {
1✔
550
      winningSubstreamToCancel.stream.cancel(reason);
1✔
551
    }
552
  }
1✔
553

554
  private void delayOrExecute(BufferEntry bufferEntry) {
555
    Collection<Substream> savedDrainedSubstreams;
556
    synchronized (lock) {
1✔
557
      if (!state.passThrough) {
1✔
558
        state.buffer.add(bufferEntry);
1✔
559
      }
560
      savedDrainedSubstreams = state.drainedSubstreams;
1✔
561
    }
1✔
562

563
    for (Substream substream : savedDrainedSubstreams) {
1✔
564
      bufferEntry.runWith(substream);
1✔
565
    }
1✔
566
  }
1✔
567

568
  /**
569
   * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use
570
   * InputStream for buffering.
571
   */
572
  @Override
573
  public final void writeMessage(InputStream message) {
574
    throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
×
575
  }
576

577
  final void sendMessage(final ReqT message) {
578
    State savedState = state;
1✔
579
    if (savedState.passThrough) {
1✔
580
      savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
1✔
581
      return;
1✔
582
    }
583

584
    class SendMessageEntry implements BufferEntry {
1✔
585
      @Override
586
      public void runWith(Substream substream) {
587
        substream.stream.writeMessage(method.streamRequest(message));
1✔
588
        // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
589
        // flushes (or half close), but retry appears to have a code path that the flushes may
590
        // not happen. The code needs to be fixed and this removed. See #9340.
591
        substream.stream.flush();
1✔
592
      }
1✔
593
    }
594

595
    delayOrExecute(new SendMessageEntry());
1✔
596
  }
1✔
597

598
  @Override
599
  public final void request(final int numMessages) {
600
    State savedState = state;
1✔
601
    if (savedState.passThrough) {
1✔
602
      savedState.winningSubstream.stream.request(numMessages);
1✔
603
      return;
1✔
604
    }
605

606
    class RequestEntry implements BufferEntry {
1✔
607
      @Override
608
      public void runWith(Substream substream) {
609
        substream.stream.request(numMessages);
1✔
610
      }
1✔
611
    }
612

613
    delayOrExecute(new RequestEntry());
1✔
614
  }
1✔
615

616
  @Override
617
  public final void flush() {
618
    State savedState = state;
1✔
619
    if (savedState.passThrough) {
1✔
620
      savedState.winningSubstream.stream.flush();
1✔
621
      return;
1✔
622
    }
623

624
    class FlushEntry implements BufferEntry {
1✔
625
      @Override
626
      public void runWith(Substream substream) {
627
        substream.stream.flush();
1✔
628
      }
1✔
629
    }
630

631
    delayOrExecute(new FlushEntry());
1✔
632
  }
1✔
633

634
  @Override
635
  public final boolean isReady() {
636
    for (Substream substream : state.drainedSubstreams) {
1✔
637
      if (substream.stream.isReady()) {
1✔
638
        return true;
1✔
639
      }
640
    }
1✔
641
    return false;
1✔
642
  }
643

644
  @Override
645
  public void optimizeForDirectExecutor() {
646
    class OptimizeDirectEntry implements BufferEntry {
1✔
647
      @Override
648
      public void runWith(Substream substream) {
649
        substream.stream.optimizeForDirectExecutor();
1✔
650
      }
1✔
651
    }
652

653
    delayOrExecute(new OptimizeDirectEntry());
1✔
654
  }
1✔
655

656
  @Override
657
  public final void setCompressor(final Compressor compressor) {
658
    class CompressorEntry implements BufferEntry {
1✔
659
      @Override
660
      public void runWith(Substream substream) {
661
        substream.stream.setCompressor(compressor);
1✔
662
      }
1✔
663
    }
664

665
    delayOrExecute(new CompressorEntry());
1✔
666
  }
1✔
667

668
  @Override
669
  public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
670
    class FullStreamDecompressionEntry implements BufferEntry {
1✔
671
      @Override
672
      public void runWith(Substream substream) {
673
        substream.stream.setFullStreamDecompression(fullStreamDecompression);
1✔
674
      }
1✔
675
    }
676

677
    delayOrExecute(new FullStreamDecompressionEntry());
1✔
678
  }
1✔
679

680
  @Override
681
  public final void setMessageCompression(final boolean enable) {
682
    class MessageCompressionEntry implements BufferEntry {
1✔
683
      @Override
684
      public void runWith(Substream substream) {
685
        substream.stream.setMessageCompression(enable);
1✔
686
      }
1✔
687
    }
688

689
    delayOrExecute(new MessageCompressionEntry());
1✔
690
  }
1✔
691

692
  @Override
693
  public final void halfClose() {
694
    class HalfCloseEntry implements BufferEntry {
1✔
695
      @Override
696
      public void runWith(Substream substream) {
697
        substream.stream.halfClose();
1✔
698
      }
1✔
699
    }
700

701
    delayOrExecute(new HalfCloseEntry());
1✔
702
  }
1✔
703

704
  @Override
705
  public final void setAuthority(final String authority) {
706
    class AuthorityEntry implements BufferEntry {
1✔
707
      @Override
708
      public void runWith(Substream substream) {
709
        substream.stream.setAuthority(authority);
1✔
710
      }
1✔
711
    }
712

713
    delayOrExecute(new AuthorityEntry());
1✔
714
  }
1✔
715

716
  @Override
717
  public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
718
    class DecompressorRegistryEntry implements BufferEntry {
1✔
719
      @Override
720
      public void runWith(Substream substream) {
721
        substream.stream.setDecompressorRegistry(decompressorRegistry);
1✔
722
      }
1✔
723
    }
724

725
    delayOrExecute(new DecompressorRegistryEntry());
1✔
726
  }
1✔
727

728
  @Override
729
  public final void setMaxInboundMessageSize(final int maxSize) {
730
    class MaxInboundMessageSizeEntry implements BufferEntry {
1✔
731
      @Override
732
      public void runWith(Substream substream) {
733
        substream.stream.setMaxInboundMessageSize(maxSize);
1✔
734
      }
1✔
735
    }
736

737
    delayOrExecute(new MaxInboundMessageSizeEntry());
1✔
738
  }
1✔
739

740
  @Override
741
  public final void setMaxOutboundMessageSize(final int maxSize) {
742
    class MaxOutboundMessageSizeEntry implements BufferEntry {
1✔
743
      @Override
744
      public void runWith(Substream substream) {
745
        substream.stream.setMaxOutboundMessageSize(maxSize);
1✔
746
      }
1✔
747
    }
748

749
    delayOrExecute(new MaxOutboundMessageSizeEntry());
1✔
750
  }
1✔
751

752
  @Override
753
  public final void setDeadline(final Deadline deadline) {
754
    class DeadlineEntry implements BufferEntry {
1✔
755
      @Override
756
      public void runWith(Substream substream) {
757
        substream.stream.setDeadline(deadline);
1✔
758
      }
1✔
759
    }
760

761
    delayOrExecute(new DeadlineEntry());
1✔
762
  }
1✔
763

764
  @Override
765
  public final Attributes getAttributes() {
766
    if (state.winningSubstream != null) {
1✔
767
      return state.winningSubstream.stream.getAttributes();
1✔
768
    }
769
    return Attributes.EMPTY;
×
770
  }
771

772
  @Override
773
  public void appendTimeoutInsight(InsightBuilder insight) {
774
    State currentState;
775
    synchronized (lock) {
1✔
776
      insight.appendKeyValue("closed", closedSubstreamsInsight);
1✔
777
      currentState = state;
1✔
778
    }
1✔
779
    if (currentState.winningSubstream != null) {
1✔
780
      // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
781
      // of the winning substream, they may not have received closed() notifications yet, thus they
782
      // may be missing from closedSubstreamsInsight.  This may be a little confusing to the user.
783
      // We need to figure out how to include them.
784
      InsightBuilder substreamInsight = new InsightBuilder();
1✔
785
      currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
1✔
786
      insight.appendKeyValue("committed", substreamInsight);
1✔
787
    } else {
1✔
788
      InsightBuilder openSubstreamsInsight = new InsightBuilder();
1✔
789
      // drainedSubstreams doesn't include all open substreams.  Those which have just been created
790
      // and are still catching up with buffered requests (in other words, still draining) will not
791
      // show up.  We think this is benign, because the draining should be typically fast, and it'd
792
      // be indistinguishable from the case where those streams are to be created a little late due
793
      // to delays in the timer.
794
      for (Substream sub : currentState.drainedSubstreams) {
1✔
795
        InsightBuilder substreamInsight = new InsightBuilder();
1✔
796
        sub.stream.appendTimeoutInsight(substreamInsight);
1✔
797
        openSubstreamsInsight.append(substreamInsight);
1✔
798
      }
1✔
799
      insight.appendKeyValue("open", openSubstreamsInsight);
1✔
800
    }
801
  }
1✔
802

803
  private static Random random = new Random();
1✔
804

805
  @VisibleForTesting
806
  static void setRandom(Random random) {
807
    RetriableStream.random = random;
1✔
808
  }
1✔
809

810
  /**
811
   * Whether there is any potential hedge at the moment. A false return value implies there is
812
   * absolutely no potential hedge. At least one of the hedges will observe a false return value
813
   * when calling this method, unless otherwise the rpc is committed.
814
   */
815
  // only called when isHedging is true
816
  @GuardedBy("lock")
817
  private boolean hasPotentialHedging(State state) {
818
    return state.winningSubstream == null
1✔
819
        && state.hedgingAttemptCount < hedgingPolicy.maxAttempts
820
        && !state.hedgingFrozen;
821
  }
822

823
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
824
  private void freezeHedging() {
825
    Future<?> futureToBeCancelled = null;
1✔
826
    synchronized (lock) {
1✔
827
      if (scheduledHedging != null) {
1✔
828
        futureToBeCancelled = scheduledHedging.markCancelled();
1✔
829
        scheduledHedging = null;
1✔
830
      }
831
      state = state.freezeHedging();
1✔
832
    }
1✔
833

834
    if (futureToBeCancelled != null) {
1✔
835
      futureToBeCancelled.cancel(false);
1✔
836
    }
837
  }
1✔
838

839
  private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
840
    savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
1✔
841
        metadata);
842
    if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
1✔
843
      listenerSerializeExecutor.execute(
1✔
844
          new Runnable() {
1✔
845
            @Override
846
            public void run() {
847
              isClosed = true;
1✔
848
              masterListener.closed(status, progress, metadata);
1✔
849
            }
1✔
850
          });
851
    }
852
  }
1✔
853

854
  private static final boolean isExperimentalRetryJitterEnabled = GrpcUtil
1✔
855
          .getFlag("GRPC_EXPERIMENTAL_XDS_RLS_LB", true);
1✔
856

857
  public static long intervalWithJitter(long intervalNanos) {
858
    double inverseJitterFactor = isExperimentalRetryJitterEnabled
1✔
859
            ? 0.4 * random.nextDouble() + 0.8 : random.nextDouble();
1✔
860
    return (long) (intervalNanos * inverseJitterFactor);
1✔
861
  }
862

863
  private static final class SavedCloseMasterListenerReason {
864
    private final Status status;
865
    private final RpcProgress progress;
866
    private final Metadata metadata;
867

868
    SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
1✔
869
      this.status = status;
1✔
870
      this.progress = progress;
1✔
871
      this.metadata = metadata;
1✔
872
    }
1✔
873
  }
874

875
  private interface BufferEntry {
876
    /** Replays the buffer entry with the given stream. */
877
    void runWith(Substream substream);
878
  }
879

880
  private final class Sublistener implements ClientStreamListener {
1✔
881
    final Substream substream;
882

883
    Sublistener(Substream substream) {
1✔
884
      this.substream = substream;
1✔
885
    }
1✔
886

887
    @Override
888
    public void headersRead(final Metadata headers) {
889
      if (substream.previousAttemptCount > 0) {
1✔
890
        headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
1✔
891
        headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
1✔
892
      }
893
      commitAndRun(substream);
1✔
894
      if (state.winningSubstream == substream) {
1✔
895
        if (throttle != null) {
1✔
896
          throttle.onSuccess();
1✔
897
        }
898
        listenerSerializeExecutor.execute(
1✔
899
            new Runnable() {
1✔
900
              @Override
901
              public void run() {
902
                masterListener.headersRead(headers);
1✔
903
              }
1✔
904
            });
905
      }
906
    }
1✔
907

908
    @Override
909
    public void closed(
910
        final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
911
      synchronized (lock) {
1✔
912
        state = state.substreamClosed(substream);
1✔
913
        closedSubstreamsInsight.append(status.getCode());
1✔
914
      }
1✔
915

916
      if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
917
        assert savedCloseMasterListenerReason != null;
1✔
918
        listenerSerializeExecutor.execute(
1✔
919
            new Runnable() {
1✔
920
              @Override
921
              public void run() {
922
                isClosed = true;
1✔
923
                masterListener.closed(savedCloseMasterListenerReason.status,
1✔
924
                    savedCloseMasterListenerReason.progress,
1✔
925
                    savedCloseMasterListenerReason.metadata);
1✔
926
              }
1✔
927
            });
928
        return;
1✔
929
      }
930

931
      // handle a race between buffer limit exceeded and closed, when setting
932
      // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
933
      if (substream.bufferLimitExceeded) {
1✔
934
        commitAndRun(substream);
1✔
935
        if (state.winningSubstream == substream) {
1✔
936
          safeCloseMasterListener(status, rpcProgress, trailers);
1✔
937
        }
938
        return;
1✔
939
      }
940
      if (rpcProgress == RpcProgress.MISCARRIED
1✔
941
          && localOnlyTransparentRetries.incrementAndGet() > 1_000) {
1✔
942
        commitAndRun(substream);
1✔
943
        if (state.winningSubstream == substream) {
1✔
944
          Status tooManyTransparentRetries = GrpcUtil.statusWithDetails(
1✔
945
              Status.Code.INTERNAL, "Too many transparent retries. Might be a bug in gRPC", status);
946
          safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
1✔
947
        }
948
        return;
1✔
949
      }
950

951
      if (state.winningSubstream == null) {
1✔
952
        if (rpcProgress == RpcProgress.MISCARRIED
1✔
953
            || (rpcProgress == RpcProgress.REFUSED
954
                && noMoreTransparentRetry.compareAndSet(false, true))) {
1✔
955
          // transparent retry
956
          final Substream newSubstream = createSubstream(substream.previousAttemptCount,
1✔
957
              true, false);
958
          if (newSubstream == null) {
1✔
959
            return;
×
960
          }
961
          if (isHedging) {
1✔
962
            synchronized (lock) {
1✔
963
              // Although this operation is not done atomically with
964
              // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of
965
              // activeHedges, so neither does it affect the commitment decision of other threads,
966
              // nor do the commitment decision making threads affect itself.
967
              state = state.replaceActiveHedge(substream, newSubstream);
1✔
968
            }
1✔
969
          }
970

971
          callExecutor.execute(new Runnable() {
1✔
972
            @Override
973
            public void run() {
974
              drain(newSubstream);
1✔
975
            }
1✔
976
          });
977
          return;
1✔
978
        } else if (rpcProgress == RpcProgress.DROPPED) {
1✔
979
          // For normal retry, nothing need be done here, will just commit.
980
          // For hedging, cancel scheduled hedge that is scheduled prior to the drop
981
          if (isHedging) {
1✔
982
            freezeHedging();
×
983
          }
984
        } else {
985
          noMoreTransparentRetry.set(true);
1✔
986

987
          if (isHedging) {
1✔
988
            HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
1✔
989
            if (hedgingPlan.isHedgeable) {
1✔
990
              pushbackHedging(hedgingPlan.hedgingPushbackMillis);
1✔
991
            }
992
            synchronized (lock) {
1✔
993
              state = state.removeActiveHedge(substream);
1✔
994
              // The invariant is whether or not #(Potential Hedge + active hedges) > 0.
995
              // Once hasPotentialHedging(state) is false, it will always be false, and then
996
              // #(state.activeHedges) will be decreasing. This guarantees that even there may be
997
              // multiple concurrent hedges, one of the hedges will end up committed.
998
              if (hedgingPlan.isHedgeable) {
1✔
999
                if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
1✔
1000
                  return;
1✔
1001
                }
1002
                // else, no activeHedges, no new hedges possible, try to commit
1003
              } // else, isHedgeable is false, try to commit
1004
            }
1✔
1005
          } else {
1✔
1006
            RetryPlan retryPlan = makeRetryDecision(status, trailers);
1✔
1007
            if (retryPlan.shouldRetry) {
1✔
1008
              // retry
1009
              Substream newSubstream = createSubstream(substream.previousAttemptCount + 1,
1✔
1010
                  false, false);
1011
              if (newSubstream == null) {
1✔
1012
                return;
×
1013
              }
1014
              // The check state.winningSubstream == null, checking if is not already committed, is
1015
              // racy, but is still safe b/c the retry will also handle committed/cancellation
1016
              FutureCanceller scheduledRetryCopy;
1017
              synchronized (lock) {
1✔
1018
                scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
1✔
1019
              }
1✔
1020

1021
              class RetryBackoffRunnable implements Runnable {
1✔
1022
                @Override
1023
                @SuppressWarnings("FutureReturnValueIgnored")
1024
                public void run() {
1025
                  synchronized (scheduledRetryCopy.lock) {
1✔
1026
                    if (scheduledRetryCopy.isCancelled()) {
1✔
1027
                      return;
×
1028
                    } else {
1029
                      scheduledRetryCopy.markCancelled();
1✔
1030
                    }
1031
                  }
1✔
1032

1033
                  callExecutor.execute(
1✔
1034
                      new Runnable() {
1✔
1035
                        @Override
1036
                        public void run() {
1037
                          drain(newSubstream);
1✔
1038
                        }
1✔
1039
                      });
1040
                }
1✔
1041
              }
1042

1043
              scheduledRetryCopy.setFuture(
1✔
1044
                  scheduledExecutorService.schedule(
1✔
1045
                      new RetryBackoffRunnable(),
1046
                      retryPlan.backoffNanos,
1047
                      TimeUnit.NANOSECONDS));
1048
              return;
1✔
1049
            }
1050
          }
1051
        }
1052
      }
1053

1054
      commitAndRun(substream);
1✔
1055
      if (state.winningSubstream == substream) {
1✔
1056
        safeCloseMasterListener(status, rpcProgress, trailers);
1✔
1057
      }
1058
    }
1✔
1059

1060
    /**
1061
     * Decides in current situation whether or not the RPC should retry and if it should retry how
1062
     * long the backoff should be. The decision does not take the commitment status into account, so
1063
     * caller should check it separately. It also updates the throttle. It does not change state.
1064
     */
1065
    private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
1066
      if (retryPolicy == null) {
1✔
1067
        return new RetryPlan(false, 0);
1✔
1068
      }
1069
      boolean shouldRetry = false;
1✔
1070
      long backoffNanos = 0L;
1✔
1071
      boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
1✔
1072
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1073
      boolean isThrottled = false;
1✔
1074
      if (throttle != null) {
1✔
1075
        if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1076
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1077
        }
1078
      }
1079

1080
      if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
1✔
1081
        if (pushbackMillis == null) {
1✔
1082
          if (isRetryableStatusCode) {
1✔
1083
            shouldRetry = true;
1✔
1084
            backoffNanos = intervalWithJitter(nextBackoffIntervalNanos);
1✔
1085
            nextBackoffIntervalNanos = Math.min(
1✔
1086
                (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
1✔
1087
                retryPolicy.maxBackoffNanos);
1✔
1088
          } // else no retry
1089
        } else if (pushbackMillis >= 0) {
1✔
1090
          shouldRetry = true;
1✔
1091
          backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
1✔
1092
          nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
1093
        } // else no retry
1094
      } // else no retry
1095

1096
      return new RetryPlan(shouldRetry, backoffNanos);
1✔
1097
    }
1098

1099
    private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
1100
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1101
      boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
1✔
1102
      boolean isThrottled = false;
1✔
1103
      if (throttle != null) {
1✔
1104
        if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1105
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1106
        }
1107
      }
1108
      if (!isFatal && !isThrottled && !status.isOk()
1✔
1109
          && (pushbackMillis != null && pushbackMillis > 0)) {
1✔
1110
        pushbackMillis = 0; // We want the retry after a nonfatal error to be immediate
1✔
1111
      }
1112
      return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
1✔
1113
    }
1114

1115
    @Nullable
1116
    private Integer getPushbackMills(Metadata trailer) {
1117
      String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
1✔
1118
      Integer pushbackMillis = null;
1✔
1119
      if (pushbackStr != null) {
1✔
1120
        try {
1121
          pushbackMillis = Integer.valueOf(pushbackStr);
1✔
1122
        } catch (NumberFormatException e) {
1✔
1123
          pushbackMillis = -1;
1✔
1124
        }
1✔
1125
      }
1126
      return pushbackMillis;
1✔
1127
    }
1128

1129
    @Override
1130
    public void messagesAvailable(final MessageProducer producer) {
1131
      State savedState = state;
1✔
1132
      checkState(
1✔
1133
          savedState.winningSubstream != null, "Headers should be received prior to messages.");
1134
      if (savedState.winningSubstream != substream) {
1✔
1135
        GrpcUtil.closeQuietly(producer);
1✔
1136
        return;
1✔
1137
      }
1138
      listenerSerializeExecutor.execute(
1✔
1139
          new Runnable() {
1✔
1140
            @Override
1141
            public void run() {
1142
              masterListener.messagesAvailable(producer);
1✔
1143
            }
1✔
1144
          });
1145
    }
1✔
1146

1147
    @Override
1148
    public void onReady() {
1149
      // FIXME(#7089): hedging case is broken.
1150
      if (!isReady()) {
1✔
1151
        return;
1✔
1152
      }
1153
      listenerSerializeExecutor.execute(
1✔
1154
          new Runnable() {
1✔
1155
            @Override
1156
            public void run() {
1157
              if (!isClosed) {
1✔
1158
                masterListener.onReady();
1✔
1159
              }
1160
            }
1✔
1161
          });
1162
    }
1✔
1163
  }
1164

1165
  private static final class State {
1166
    /** Committed and the winning substream drained. */
1167
    final boolean passThrough;
1168

1169
    /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
1170
    @Nullable final List<BufferEntry> buffer;
1171

1172
    /**
1173
     * Unmodifiable collection of all the open substreams that are drained. Singleton once
1174
     * passThrough; Empty if committed but not passTrough.
1175
     */
1176
    final Collection<Substream> drainedSubstreams;
1177

1178
    /**
1179
     * Unmodifiable collection of all the active hedging substreams.
1180
     *
1181
     * <p>A substream even with the attribute substream.closed being true may be considered still
1182
     * "active" at the moment as long as it is in this collection.
1183
     */
1184
    final Collection<Substream> activeHedges; // not null once isHedging = true
1185

1186
    final int hedgingAttemptCount;
1187

1188
    /** Null until committed. */
1189
    @Nullable final Substream winningSubstream;
1190

1191
    /** Not required to set to true when cancelled, but can short-circuit the draining process. */
1192
    final boolean cancelled;
1193

1194
    /** No more hedging due to events like drop or pushback. */
1195
    final boolean hedgingFrozen;
1196

1197
    State(
1198
        @Nullable List<BufferEntry> buffer,
1199
        Collection<Substream> drainedSubstreams,
1200
        Collection<Substream> activeHedges,
1201
        @Nullable Substream winningSubstream,
1202
        boolean cancelled,
1203
        boolean passThrough,
1204
        boolean hedgingFrozen,
1205
        int hedgingAttemptCount) {
1✔
1206
      this.buffer = buffer;
1✔
1207
      this.drainedSubstreams =
1✔
1208
          checkNotNull(drainedSubstreams, "drainedSubstreams");
1✔
1209
      this.winningSubstream = winningSubstream;
1✔
1210
      this.activeHedges = activeHedges;
1✔
1211
      this.cancelled = cancelled;
1✔
1212
      this.passThrough = passThrough;
1✔
1213
      this.hedgingFrozen = hedgingFrozen;
1✔
1214
      this.hedgingAttemptCount = hedgingAttemptCount;
1✔
1215

1216
      checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
1✔
1217
      checkState(
1✔
1218
          !passThrough || winningSubstream != null,
1219
          "passThrough should imply winningSubstream != null");
1220
      checkState(
1✔
1221
          !passThrough
1222
              || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
1✔
1223
              || (drainedSubstreams.size() == 0 && winningSubstream.closed),
1✔
1224
          "passThrough should imply winningSubstream is drained");
1225
      checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
1✔
1226
    }
1✔
1227

1228
    @CheckReturnValue
1229
    // GuardedBy RetriableStream.lock
1230
    State cancelled() {
1231
      return new State(
1✔
1232
          buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough,
1233
          hedgingFrozen, hedgingAttemptCount);
1234
    }
1235

1236
    /** The given substream is drained. */
1237
    @CheckReturnValue
1238
    // GuardedBy RetriableStream.lock
1239
    State substreamDrained(Substream substream) {
1240
      checkState(!passThrough, "Already passThrough");
1✔
1241

1242
      Collection<Substream> drainedSubstreams;
1243
      
1244
      if (substream.closed) {
1✔
1245
        drainedSubstreams = this.drainedSubstreams;
1✔
1246
      } else if (this.drainedSubstreams.isEmpty()) {
1✔
1247
        // optimize for 0-retry, which is most of the cases.
1248
        drainedSubstreams = Collections.singletonList(substream);
1✔
1249
      } else {
1250
        drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1251
        drainedSubstreams.add(substream);
1✔
1252
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1253
      }
1254

1255
      boolean passThrough = winningSubstream != null;
1✔
1256

1257
      List<BufferEntry> buffer = this.buffer;
1✔
1258
      if (passThrough) {
1✔
1259
        checkState(
1✔
1260
            winningSubstream == substream, "Another RPC attempt has already committed");
1261
        buffer = null;
1✔
1262
      }
1263

1264
      return new State(
1✔
1265
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1266
          hedgingFrozen, hedgingAttemptCount);
1267
    }
1268

1269
    /** The given substream is closed. */
1270
    @CheckReturnValue
1271
    // GuardedBy RetriableStream.lock
1272
    State substreamClosed(Substream substream) {
1273
      substream.closed = true;
1✔
1274
      if (this.drainedSubstreams.contains(substream)) {
1✔
1275
        Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1276
        drainedSubstreams.remove(substream);
1✔
1277
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1278
        return new State(
1✔
1279
            buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1280
            hedgingFrozen, hedgingAttemptCount);
1281
      } else {
1282
        return this;
1✔
1283
      }
1284
    }
1285

1286
    @CheckReturnValue
1287
    // GuardedBy RetriableStream.lock
1288
    State committed(Substream winningSubstream) {
1289
      checkState(this.winningSubstream == null, "Already committed");
1✔
1290

1291
      boolean passThrough = false;
1✔
1292
      List<BufferEntry> buffer = this.buffer;
1✔
1293
      Collection<Substream> drainedSubstreams;
1294

1295
      if (this.drainedSubstreams.contains(winningSubstream)) {
1✔
1296
        passThrough = true;
1✔
1297
        buffer = null;
1✔
1298
        drainedSubstreams = Collections.singleton(winningSubstream);
1✔
1299
      } else {
1300
        drainedSubstreams = Collections.emptyList();
1✔
1301
      }
1302

1303
      return new State(
1✔
1304
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1305
          hedgingFrozen, hedgingAttemptCount);
1306
    }
1307

1308
    @CheckReturnValue
1309
    // GuardedBy RetriableStream.lock
1310
    State freezeHedging() {
1311
      if (hedgingFrozen) {
1✔
1312
        return this;
×
1313
      }
1314
      return new State(
1✔
1315
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1316
          true, hedgingAttemptCount);
1317
    }
1318

1319
    @CheckReturnValue
1320
    // GuardedBy RetriableStream.lock
1321
    // state.hedgingAttemptCount is modified only here.
1322
    // The method is only called in RetriableStream.start() and HedgingRunnable.run()
1323
    State addActiveHedge(Substream substream) {
1324
      // hasPotentialHedging must be true
1325
      checkState(!hedgingFrozen, "hedging frozen");
1✔
1326
      checkState(winningSubstream == null, "already committed");
1✔
1327

1328
      Collection<Substream> activeHedges;
1329
      if (this.activeHedges == null) {
1✔
1330
        activeHedges = Collections.singleton(substream);
1✔
1331
      } else {
1332
        activeHedges = new ArrayList<>(this.activeHedges);
1✔
1333
        activeHedges.add(substream);
1✔
1334
        activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1335
      }
1336

1337
      int hedgingAttemptCount = this.hedgingAttemptCount + 1;
1✔
1338
      return new State(
1✔
1339
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1340
          hedgingFrozen, hedgingAttemptCount);
1341
    }
1342

1343
    @CheckReturnValue
1344
    // GuardedBy RetriableStream.lock
1345
    // The method is only called in Sublistener.closed()
1346
    State removeActiveHedge(Substream substream) {
1347
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1348
      activeHedges.remove(substream);
1✔
1349
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1350

1351
      return new State(
1✔
1352
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1353
          hedgingFrozen, hedgingAttemptCount);
1354
    }
1355

1356
    @CheckReturnValue
1357
    // GuardedBy RetriableStream.lock
1358
    // The method is only called for transparent retry.
1359
    State replaceActiveHedge(Substream oldOne, Substream newOne) {
1360
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1361
      activeHedges.remove(oldOne);
1✔
1362
      activeHedges.add(newOne);
1✔
1363
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1364

1365
      return new State(
1✔
1366
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1367
          hedgingFrozen, hedgingAttemptCount);
1368
    }
1369
  }
1370

1371
  /**
1372
   * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
1373
   *  attributes.
1374
   */
1375
  private static final class Substream {
1376
    ClientStream stream;
1377

1378
    // GuardedBy RetriableStream.lock
1379
    boolean closed;
1380

1381
    // setting to true must be GuardedBy RetriableStream.lock
1382
    boolean bufferLimitExceeded;
1383

1384
    final int previousAttemptCount;
1385

1386
    Substream(int previousAttemptCount) {
1✔
1387
      this.previousAttemptCount = previousAttemptCount;
1✔
1388
    }
1✔
1389
  }
1390

1391

1392
  /**
1393
   * Traces the buffer used by a substream.
1394
   */
1395
  class BufferSizeTracer extends ClientStreamTracer {
1396
    // Each buffer size tracer is dedicated to one specific substream.
1397
    private final Substream substream;
1398

1399
    @GuardedBy("lock")
1400
    long bufferNeeded;
1401

1402
    BufferSizeTracer(Substream substream) {
1✔
1403
      this.substream = substream;
1✔
1404
    }
1✔
1405

1406
    /**
1407
     * A message is sent to the wire, so its reference would be released if no retry or
1408
     * hedging were involved. So at this point we have to hold the reference of the message longer
1409
     * for retry, and we need to increment {@code substream.bufferNeeded}.
1410
     */
1411
    @Override
1412
    public void outboundWireSize(long bytes) {
1413
      if (state.winningSubstream != null) {
1✔
1414
        return;
1✔
1415
      }
1416

1417
      Runnable postCommitTask = null;
1✔
1418

1419
      // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
1420
      synchronized (lock) {
1✔
1421
        if (state.winningSubstream != null || substream.closed) {
1✔
1422
          return;
×
1423
        }
1424
        bufferNeeded += bytes;
1✔
1425
        if (bufferNeeded <= perRpcBufferUsed) {
1✔
1426
          return;
1✔
1427
        }
1428

1429
        if (bufferNeeded > perRpcBufferLimit) {
1✔
1430
          substream.bufferLimitExceeded = true;
1✔
1431
        } else {
1432
          // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
1433
          long savedChannelBufferUsed =
1✔
1434
              channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
1✔
1435
          perRpcBufferUsed = bufferNeeded;
1✔
1436

1437
          if (savedChannelBufferUsed > channelBufferLimit) {
1✔
1438
            substream.bufferLimitExceeded = true;
1✔
1439
          }
1440
        }
1441

1442
        if (substream.bufferLimitExceeded) {
1✔
1443
          postCommitTask = commit(substream);
1✔
1444
        }
1445
      }
1✔
1446

1447
      if (postCommitTask != null) {
1✔
1448
        postCommitTask.run();
1✔
1449
      }
1450
    }
1✔
1451
  }
1452

1453
  /**
1454
   *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
1455
   *  the Channel. There should be a single instance of it for each channel.
1456
   */
1457
  static final class ChannelBufferMeter {
1✔
1458
    private final AtomicLong bufferUsed = new AtomicLong();
1✔
1459

1460
    @VisibleForTesting
1461
    long addAndGet(long newBytesUsed) {
1462
      return bufferUsed.addAndGet(newBytesUsed);
1✔
1463
    }
1464
  }
1465

1466
  /**
1467
   * Used for retry throttling.
1468
   */
1469
  static final class Throttle {
1470

1471
    private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
1472

1473
    /**
1474
     * 1000 times the maxTokens field of the retryThrottling policy in service config.
1475
     * The number of tokens starts at maxTokens. The token_count will always be between 0 and
1476
     * maxTokens.
1477
     */
1478
    final int maxTokens;
1479

1480
    /**
1481
     * Half of {@code maxTokens}.
1482
     */
1483
    final int threshold;
1484

1485
    /**
1486
     * 1000 times the tokenRatio field of the retryThrottling policy in service config.
1487
     */
1488
    final int tokenRatio;
1489

1490
    final AtomicInteger tokenCount = new AtomicInteger();
1✔
1491

1492
    Throttle(float maxTokens, float tokenRatio) {
1✔
1493
      // tokenRatio is up to 3 decimal places
1494
      this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1495
      this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1496
      this.threshold = this.maxTokens / 2;
1✔
1497
      tokenCount.set(this.maxTokens);
1✔
1498
    }
1✔
1499

1500
    @VisibleForTesting
1501
    boolean isAboveThreshold() {
1502
      return tokenCount.get() > threshold;
1✔
1503
    }
1504

1505
    /**
1506
     * Counts down the token on qualified failure and checks if it is above the threshold
1507
     * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
1508
     * a not-to-retry pushback.
1509
     */
1510
    @VisibleForTesting
1511
    boolean onQualifiedFailureThenCheckIsAboveThreshold() {
1512
      while (true) {
1513
        int currentCount = tokenCount.get();
1✔
1514
        if (currentCount == 0) {
1✔
1515
          return false;
1✔
1516
        }
1517
        int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1518
        boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
1✔
1519
        if (updated) {
1✔
1520
          return decremented > threshold;
1✔
1521
        }
1522
      }
×
1523
    }
1524

1525
    @VisibleForTesting
1526
    void onSuccess() {
1527
      while (true) {
1528
        int currentCount = tokenCount.get();
1✔
1529
        if (currentCount == maxTokens) {
1✔
1530
          break;
1✔
1531
        }
1532
        int incremented = currentCount + tokenRatio;
1✔
1533
        boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
1✔
1534
        if (updated) {
1✔
1535
          break;
1✔
1536
        }
1537
      }
×
1538
    }
1✔
1539

1540
    @Override
1541
    public boolean equals(Object o) {
1542
      if (this == o) {
1✔
1543
        return true;
×
1544
      }
1545
      if (!(o instanceof Throttle)) {
1✔
1546
        return false;
×
1547
      }
1548
      Throttle that = (Throttle) o;
1✔
1549
      return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
1✔
1550
    }
1551

1552
    @Override
1553
    public int hashCode() {
1554
      return Objects.hashCode(maxTokens, tokenRatio);
×
1555
    }
1556
  }
1557

1558
  private static final class RetryPlan {
1559
    final boolean shouldRetry;
1560
    final long backoffNanos;
1561

1562
    RetryPlan(boolean shouldRetry, long backoffNanos) {
1✔
1563
      this.shouldRetry = shouldRetry;
1✔
1564
      this.backoffNanos = backoffNanos;
1✔
1565
    }
1✔
1566
  }
1567

1568
  private static final class HedgingPlan {
1569
    final boolean isHedgeable;
1570
    @Nullable
1571
    final Integer hedgingPushbackMillis;
1572

1573
    public HedgingPlan(
1574
        boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
1✔
1575
      this.isHedgeable = isHedgeable;
1✔
1576
      this.hedgingPushbackMillis = hedgingPushbackMillis;
1✔
1577
    }
1✔
1578
  }
1579

1580
  /** Allows cancelling a Future without racing with setting the future. */
1581
  private static final class FutureCanceller {
1582

1583
    final Object lock;
1584
    @GuardedBy("lock")
1585
    Future<?> future;
1586
    @GuardedBy("lock")
1587
    boolean cancelled;
1588

1589
    FutureCanceller(Object lock) {
1✔
1590
      this.lock = lock;
1✔
1591
    }
1✔
1592

1593
    void setFuture(Future<?> future) {
1594
      boolean wasCancelled;
1595
      synchronized (lock) {
1✔
1596
        wasCancelled = cancelled;
1✔
1597
        if (!wasCancelled) {
1✔
1598
          this.future = future;
1✔
1599
        }
1600
      }
1✔
1601
      if (wasCancelled) {
1✔
1602
        future.cancel(false);
×
1603
      }
1604
    }
1✔
1605

1606
    @GuardedBy("lock")
1607
    @CheckForNull // Must cancel the returned future if not null.
1608
    Future<?> markCancelled() {
1609
      cancelled = true;
1✔
1610
      return future;
1✔
1611
    }
1612

1613
    @GuardedBy("lock")
1614
    boolean isCancelled() {
1615
      return cancelled;
1✔
1616
    }
1617
  }
1618
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc