• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20005

01 Oct 2025 02:05AM UTC coverage: 88.585% (+0.003%) from 88.582%
#20005

push

github

web-flow
netty: Unconditionally disable adaptive cumulator (#12390)

io.netty.util.Version is unreliable, so we stop using it. grpc-netty and
grpc-netty-shaded have their version.properties mix, and you can't tell
which is which.

Changed the tests to use assume, so it is clear in the results that they
weren't run.

34859 of 39351 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.07
/../core/src/main/java/io/grpc/internal/RetriableStream.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Objects;
25
import com.google.errorprone.annotations.CheckReturnValue;
26
import com.google.errorprone.annotations.concurrent.GuardedBy;
27
import io.grpc.Attributes;
28
import io.grpc.ClientStreamTracer;
29
import io.grpc.Compressor;
30
import io.grpc.Deadline;
31
import io.grpc.DecompressorRegistry;
32
import io.grpc.Metadata;
33
import io.grpc.MethodDescriptor;
34
import io.grpc.Status;
35
import io.grpc.SynchronizationContext;
36
import io.grpc.internal.ClientStreamListener.RpcProgress;
37
import java.io.InputStream;
38
import java.lang.Thread.UncaughtExceptionHandler;
39
import java.util.ArrayList;
40
import java.util.Collection;
41
import java.util.Collections;
42
import java.util.List;
43
import java.util.Random;
44
import java.util.concurrent.Executor;
45
import java.util.concurrent.Future;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import java.util.concurrent.atomic.AtomicBoolean;
49
import java.util.concurrent.atomic.AtomicInteger;
50
import java.util.concurrent.atomic.AtomicLong;
51
import javax.annotation.CheckForNull;
52
import javax.annotation.Nullable;
53

54
/** A logical {@link ClientStream} that is retriable. */
55
abstract class RetriableStream<ReqT> implements ClientStream {
56
  @VisibleForTesting
57
  static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
1✔
58
      Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
1✔
59

60
  @VisibleForTesting
61
  static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
1✔
62
      Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
1✔
63

64
  private static final Status CANCELLED_BECAUSE_COMMITTED =
1✔
65
      Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
1✔
66

67
  private final MethodDescriptor<ReqT, ?> method;
68
  private final Executor callExecutor;
69
  private final Executor listenerSerializeExecutor = new SynchronizationContext(
1✔
70
      new UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          throw Status.fromThrowable(e)
×
74
              .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
×
75
              .asRuntimeException();
×
76
        }
77
      }
78
  );
79
  private final ScheduledExecutorService scheduledExecutorService;
80
  // Must not modify it.
81
  private final Metadata headers;
82
  @Nullable
83
  private final RetryPolicy retryPolicy;
84
  @Nullable
85
  private final HedgingPolicy hedgingPolicy;
86
  private final boolean isHedging;
87

88
  /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
89
  private final Object lock = new Object();
1✔
90

91
  private final ChannelBufferMeter channelBufferUsed;
92
  private final long perRpcBufferLimit;
93
  private final long channelBufferLimit;
94
  @Nullable
95
  private final Throttle throttle;
96
  @GuardedBy("lock")
1✔
97
  private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
98

99
  private volatile State state = new State(
1✔
100
      new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
1✔
101
      false, 0);
102

103
  /**
104
   * Either non-local transparent retry happened or reached server's application logic.
105
   *
106
   * <p>Note that local-only transparent retries are unlimited.
107
   */
108
  private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
1✔
109
  private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
1✔
110
  private final AtomicInteger inFlightSubStreams = new AtomicInteger();
1✔
111
  private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112

113
  // Used for recording the share of buffer used for the current call out of the channel buffer.
114
  // This field would not be necessary if there is no channel buffer limit.
115
  @GuardedBy("lock")
116
  private long perRpcBufferUsed;
117

118
  private ClientStreamListener masterListener;
119
  @GuardedBy("lock")
120
  private FutureCanceller scheduledRetry;
121
  @GuardedBy("lock")
122
  private FutureCanceller scheduledHedging;
123
  private long nextBackoffIntervalNanos;
124
  private Status cancellationStatus;
125
  private boolean isClosed;
126

127
  RetriableStream(
128
      MethodDescriptor<ReqT, ?> method, Metadata headers,
129
      ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
130
      Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
131
      @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy,
132
      @Nullable Throttle throttle) {
1✔
133
    this.method = method;
1✔
134
    this.channelBufferUsed = channelBufferUsed;
1✔
135
    this.perRpcBufferLimit = perRpcBufferLimit;
1✔
136
    this.channelBufferLimit = channelBufferLimit;
1✔
137
    this.callExecutor = callExecutor;
1✔
138
    this.scheduledExecutorService = scheduledExecutorService;
1✔
139
    this.headers = headers;
1✔
140
    this.retryPolicy = retryPolicy;
1✔
141
    if (retryPolicy != null) {
1✔
142
      this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
143
    }
144
    this.hedgingPolicy = hedgingPolicy;
1✔
145
    checkArgument(
1✔
146
        retryPolicy == null || hedgingPolicy == null,
147
        "Should not provide both retryPolicy and hedgingPolicy");
148
    this.isHedging = hedgingPolicy != null;
1✔
149
    this.throttle = throttle;
1✔
150
  }
1✔
151

152
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153
  @Nullable // null if already committed
154
  @CheckReturnValue
155
  private Runnable commit(final Substream winningSubstream) {
156
    synchronized (lock) {
1✔
157
      if (state.winningSubstream != null) {
1✔
158
        return null;
1✔
159
      }
160
      final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
1✔
161

162
      state = state.committed(winningSubstream);
1✔
163

164
      // subtract the share of this RPC from channelBufferUsed.
165
      channelBufferUsed.addAndGet(-perRpcBufferUsed);
1✔
166

167
      final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
1✔
168
      final Future<?> retryFuture;
169
      if (scheduledRetry != null) {
1✔
170
        retryFuture = scheduledRetry.markCancelled();
1✔
171
        scheduledRetry = null;
1✔
172
      } else {
173
        retryFuture = null;
1✔
174
      }
175
      // cancel the scheduled hedging if it is scheduled prior to the commitment
176
      final Future<?> hedgingFuture;
177
      if (scheduledHedging != null) {
1✔
178
        hedgingFuture = scheduledHedging.markCancelled();
1✔
179
        scheduledHedging = null;
1✔
180
      } else {
181
        hedgingFuture = null;
1✔
182
      }
183

184
      class CommitTask implements Runnable {
1✔
185
        @Override
186
        public void run() {
187
          // For hedging only, not needed for normal retry
188
          for (Substream substream : savedDrainedSubstreams) {
1✔
189
            if (substream != winningSubstream) {
1✔
190
              substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
1✔
191
            }
192
          }
1✔
193
          if (retryFuture != null) {
1✔
194
            retryFuture.cancel(false);
1✔
195
            if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
196
              assert savedCloseMasterListenerReason != null;
×
197
              listenerSerializeExecutor.execute(
×
198
                  new Runnable() {
×
199
                    @Override
200
                    public void run() {
201
                      isClosed = true;
×
202
                      masterListener.closed(savedCloseMasterListenerReason.status,
×
203
                          savedCloseMasterListenerReason.progress,
×
204
                          savedCloseMasterListenerReason.metadata);
×
205
                    }
×
206
                  });
207
            }
208
          }
209

210
          if (hedgingFuture != null) {
1✔
211
            hedgingFuture.cancel(false);
1✔
212
          }
213

214
          postCommit();
1✔
215
        }
1✔
216
      }
217

218
      return new CommitTask();
1✔
219
    }
220
  }
221

222
  abstract void postCommit();
223

224
  /**
225
   * Calls commit() and if successful runs the post commit task. Post commit task will be non-null
226
   * for only once. The post commit task cancels other non-winning streams on separate transport
227
   * threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream
228
   * transports.(issues/10314)
229
   * This method should be called only in subListener callbacks. This guarantees callExecutor
230
   * schedules tasks before master listener closes, which is protected by the inFlightSubStreams
231
   * decorative. That is because:
232
   * For a successful winning stream, other streams won't attempt to close master listener.
233
   * For a cancelled winning stream (noop), other stream won't attempt to close master listener.
234
   * For a failed/closed winning stream, the last closed stream closes the master listener, and
235
   * callExecutor scheduling happens-before that.
236
   */
237
  private void commitAndRun(Substream winningSubstream) {
238
    Runnable postCommitTask = commit(winningSubstream);
1✔
239

240
    if (postCommitTask != null) {
1✔
241
      callExecutor.execute(postCommitTask);
1✔
242
    }
243
  }
1✔
244

245
  // returns null means we should not create new sub streams, e.g. cancelled or
246
  // other close condition is met for retriableStream.
247
  @Nullable
248
  private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry,
249
                                    boolean isHedgedStream) {
250
    int inFlight;
251
    do {
252
      inFlight = inFlightSubStreams.get();
1✔
253
      if (inFlight < 0) {
1✔
254
        return null;
×
255
      }
256
    } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1));
1✔
257
    Substream sub = new Substream(previousAttemptCount);
1✔
258
    // one tracer per substream
259
    final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
1✔
260
    ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
1✔
261
      @Override
262
      public ClientStreamTracer newClientStreamTracer(
263
          ClientStreamTracer.StreamInfo info, Metadata headers) {
264
        return bufferSizeTracer;
1✔
265
      }
266
    };
267

268
    Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
1✔
269
    // NOTICE: This set _must_ be done before stream.start() and it actually is.
270
    sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry,
1✔
271
        isHedgedStream);
272
    return sub;
1✔
273
  }
274

275
  /**
276
   * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
277
   * Client stream is not yet started.
278
   */
279
  abstract ClientStream newSubstream(
280
      Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
281
      boolean isTransparentRetry, boolean isHedgedStream);
282

283
  /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
284
  @VisibleForTesting
285
  final Metadata updateHeaders(
286
      Metadata originalHeaders, int previousAttemptCount) {
287
    Metadata newHeaders = new Metadata();
1✔
288
    newHeaders.merge(originalHeaders);
1✔
289
    if (previousAttemptCount > 0) {
1✔
290
      newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount));
1✔
291
    }
292
    return newHeaders;
1✔
293
  }
294

295
  private void drain(Substream substream) {
296
    int index = 0;
1✔
297
    int chunk = 0x80;
1✔
298
    List<BufferEntry> list = null;
1✔
299
    boolean streamStarted = false;
1✔
300
    Runnable onReadyRunnable = null;
1✔
301

302
    while (true) {
303
      State savedState;
304

305
      synchronized (lock) {
1✔
306
        savedState = state;
1✔
307
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
308
          // committed but not me, to be cancelled
309
          break;
1✔
310
        }
311
        if (savedState.cancelled) {
1✔
312
          break;
1✔
313
        }
314
        if (index == savedState.buffer.size()) { // I'm drained
1✔
315
          state = savedState.substreamDrained(substream);
1✔
316
          if (!isReady()) {
1✔
317
            return;
1✔
318
          }
319
          onReadyRunnable = new Runnable() {
1✔
320
            @Override
321
            public void run() {
322
              if (!isClosed) {
1✔
323
                masterListener.onReady();
1✔
324
              }
325
            }
1✔
326
          };
327
          break;
1✔
328
        }
329

330
        if (substream.closed) {
1✔
331
          return;
×
332
        }
333

334
        int stop = Math.min(index + chunk, savedState.buffer.size());
1✔
335
        if (list == null) {
1✔
336
          list = new ArrayList<>(savedState.buffer.subList(index, stop));
1✔
337
        } else {
338
          list.clear();
1✔
339
          list.addAll(savedState.buffer.subList(index, stop));
1✔
340
        }
341
        index = stop;
1✔
342
      }
1✔
343

344
      for (BufferEntry bufferEntry : list) {
1✔
345
        bufferEntry.runWith(substream);
1✔
346
        if (bufferEntry instanceof RetriableStream.StartEntry) {
1✔
347
          streamStarted = true;
1✔
348
        }
349
        savedState = state;
1✔
350
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
351
          // committed but not me, to be cancelled
352
          break;
1✔
353
        }
354
        if (savedState.cancelled) {
1✔
355
          break;
1✔
356
        }
357
      }
1✔
358
    }
1✔
359

360
    if (onReadyRunnable != null) {
1✔
361
      listenerSerializeExecutor.execute(onReadyRunnable);
1✔
362
      return;
1✔
363
    }
364

365
    if (!streamStarted) {
1✔
366
      // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
367
      substream.stream.start(new Sublistener(substream));
1✔
368
    }
369
    substream.stream.cancel(
1✔
370
        state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
1✔
371
  }
1✔
372

373
  /**
374
   * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
375
   */
376
  @CheckReturnValue
377
  @Nullable
378
  abstract Status prestart();
379

380
  class StartEntry implements BufferEntry {
1✔
381
    @Override
382
    public void runWith(Substream substream) {
383
      substream.stream.start(new Sublistener(substream));
1✔
384
    }
1✔
385
  }
386

387
  /** Starts the first RPC attempt. */
388
  @Override
389
  public final void start(ClientStreamListener listener) {
390
    masterListener = listener;
1✔
391

392
    Status shutdownStatus = prestart();
1✔
393

394
    if (shutdownStatus != null) {
1✔
395
      cancel(shutdownStatus);
1✔
396
      return;
1✔
397
    }
398

399
    synchronized (lock) {
1✔
400
      state.buffer.add(new StartEntry());
1✔
401
    }
1✔
402

403
    Substream substream = createSubstream(0, false, false);
1✔
404
    if (substream == null) {
1✔
405
      return;
×
406
    }
407
    if (isHedging) {
1✔
408
      FutureCanceller scheduledHedgingRef = null;
1✔
409

410
      synchronized (lock) {
1✔
411
        state = state.addActiveHedge(substream);
1✔
412
        if (hasPotentialHedging(state)
1✔
413
            && (throttle == null || throttle.isAboveThreshold())) {
1✔
414
          scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
1✔
415
        }
416
      }
1✔
417

418
      if (scheduledHedgingRef != null) {
1✔
419
        scheduledHedgingRef.setFuture(
1✔
420
            scheduledExecutorService.schedule(
1✔
421
                new HedgingRunnable(scheduledHedgingRef),
422
                hedgingPolicy.hedgingDelayNanos,
423
                TimeUnit.NANOSECONDS));
424
      }
425
    }
426

427
    drain(substream);
1✔
428
  }
1✔
429

430
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
431
  private void pushbackHedging(@Nullable Integer delayMillis) {
432
    if (delayMillis == null) {
1✔
433
      return;
1✔
434
    }
435
    if (delayMillis < 0) {
1✔
436
      freezeHedging();
1✔
437
      return;
1✔
438
    }
439

440
    // Cancels the current scheduledHedging and reschedules a new one.
441
    FutureCanceller future;
442
    Future<?> futureToBeCancelled;
443

444
    synchronized (lock) {
1✔
445
      if (scheduledHedging == null) {
1✔
446
        return;
×
447
      }
448

449
      futureToBeCancelled = scheduledHedging.markCancelled();
1✔
450
      scheduledHedging = future = new FutureCanceller(lock);
1✔
451
    }
1✔
452

453
    if (futureToBeCancelled != null) {
1✔
454
      futureToBeCancelled.cancel(false);
1✔
455
    }
456
    future.setFuture(scheduledExecutorService.schedule(
1✔
457
        new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS));
1✔
458
  }
1✔
459

460
  private final class HedgingRunnable implements Runnable {
461

462
    // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
463
    // by a positive push-back just after newSubstream is instantiated, so that we can double check.
464
    final FutureCanceller scheduledHedgingRef;
465

466
    HedgingRunnable(FutureCanceller scheduledHedging) {
1✔
467
      scheduledHedgingRef = scheduledHedging;
1✔
468
    }
1✔
469

470
    @Override
471
    public void run() {
472
      // It's safe to read state.hedgingAttemptCount here.
473
      // If this run is not cancelled, the value of state.hedgingAttemptCount won't change
474
      // until state.addActiveHedge() is called subsequently, even the state could possibly
475
      // change.
476
      Substream newSubstream = createSubstream(state.hedgingAttemptCount, false, true);
1✔
477
      if (newSubstream == null) {
1✔
478
        return;
×
479
      }
480
      callExecutor.execute(
1✔
481
          new Runnable() {
1✔
482
            @SuppressWarnings("GuardedBy")  //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
483
            @Override
484
            public void run() {
485
              boolean cancelled = false;
1✔
486
              FutureCanceller future = null;
1✔
487

488
              synchronized (lock) {
1✔
489
                if (scheduledHedgingRef.isCancelled()) {
1✔
490
                  cancelled = true;
×
491
                } else {
492
                  state = state.addActiveHedge(newSubstream);
1✔
493
                  if (hasPotentialHedging(state)
1✔
494
                      && (throttle == null || throttle.isAboveThreshold())) {
1✔
495
                    scheduledHedging = future = new FutureCanceller(lock);
1✔
496
                  } else {
497
                    state = state.freezeHedging();
1✔
498
                    scheduledHedging = null;
1✔
499
                  }
500
                }
501
              }
1✔
502

503
              if (cancelled) {
1✔
504
                // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
505
                newSubstream.stream.start(new Sublistener(newSubstream));
×
506
                newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
×
507
                return;
×
508
              }
509
              if (future != null) {
1✔
510
                future.setFuture(
1✔
511
                    scheduledExecutorService.schedule(
1✔
512
                        new HedgingRunnable(future),
513
                        hedgingPolicy.hedgingDelayNanos,
1✔
514
                        TimeUnit.NANOSECONDS));
515
              }
516
              drain(newSubstream);
1✔
517
            }
1✔
518
          });
519
    }
1✔
520
  }
521

522
  @Override
523
  public final void cancel(final Status reason) {
524
    Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
1✔
525
    noopSubstream.stream = new NoopClientStream();
1✔
526
    Runnable runnable = commit(noopSubstream);
1✔
527

528
    if (runnable != null) {
1✔
529
      synchronized (lock) {
1✔
530
        state = state.substreamDrained(noopSubstream);
1✔
531
      }
1✔
532
      runnable.run();
1✔
533
      safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
1✔
534
      return;
1✔
535
    }
536

537
    Substream winningSubstreamToCancel = null;
1✔
538
    synchronized (lock) {
1✔
539
      if (state.drainedSubstreams.contains(state.winningSubstream)) {
1✔
540
        winningSubstreamToCancel = state.winningSubstream;
1✔
541
      } else { // the winningSubstream will be cancelled while draining
542
        cancellationStatus = reason;
1✔
543
      }
544
      state = state.cancelled();
1✔
545
    }
1✔
546
    if (winningSubstreamToCancel != null) {
1✔
547
      winningSubstreamToCancel.stream.cancel(reason);
1✔
548
    }
549
  }
1✔
550

551
  private void delayOrExecute(BufferEntry bufferEntry) {
552
    Collection<Substream> savedDrainedSubstreams;
553
    synchronized (lock) {
1✔
554
      if (!state.passThrough) {
1✔
555
        state.buffer.add(bufferEntry);
1✔
556
      }
557
      savedDrainedSubstreams = state.drainedSubstreams;
1✔
558
    }
1✔
559

560
    for (Substream substream : savedDrainedSubstreams) {
1✔
561
      bufferEntry.runWith(substream);
1✔
562
    }
1✔
563
  }
1✔
564

565
  /**
566
   * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use
567
   * InputStream for buffering.
568
   */
569
  @Override
570
  public final void writeMessage(InputStream message) {
571
    throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
×
572
  }
573

574
  final void sendMessage(final ReqT message) {
575
    State savedState = state;
1✔
576
    if (savedState.passThrough) {
1✔
577
      savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
1✔
578
      return;
1✔
579
    }
580

581
    class SendMessageEntry implements BufferEntry {
1✔
582
      @Override
583
      public void runWith(Substream substream) {
584
        substream.stream.writeMessage(method.streamRequest(message));
1✔
585
        // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
586
        // flushes (or half close), but retry appears to have a code path that the flushes may
587
        // not happen. The code needs to be fixed and this removed. See #9340.
588
        substream.stream.flush();
1✔
589
      }
1✔
590
    }
591

592
    delayOrExecute(new SendMessageEntry());
1✔
593
  }
1✔
594

595
  @Override
596
  public final void request(final int numMessages) {
597
    State savedState = state;
1✔
598
    if (savedState.passThrough) {
1✔
599
      savedState.winningSubstream.stream.request(numMessages);
1✔
600
      return;
1✔
601
    }
602

603
    class RequestEntry implements BufferEntry {
1✔
604
      @Override
605
      public void runWith(Substream substream) {
606
        substream.stream.request(numMessages);
1✔
607
      }
1✔
608
    }
609

610
    delayOrExecute(new RequestEntry());
1✔
611
  }
1✔
612

613
  @Override
614
  public final void flush() {
615
    State savedState = state;
1✔
616
    if (savedState.passThrough) {
1✔
617
      savedState.winningSubstream.stream.flush();
1✔
618
      return;
1✔
619
    }
620

621
    class FlushEntry implements BufferEntry {
1✔
622
      @Override
623
      public void runWith(Substream substream) {
624
        substream.stream.flush();
1✔
625
      }
1✔
626
    }
627

628
    delayOrExecute(new FlushEntry());
1✔
629
  }
1✔
630

631
  @Override
632
  public final boolean isReady() {
633
    for (Substream substream : state.drainedSubstreams) {
1✔
634
      if (substream.stream.isReady()) {
1✔
635
        return true;
1✔
636
      }
637
    }
1✔
638
    return false;
1✔
639
  }
640

641
  @Override
642
  public void optimizeForDirectExecutor() {
643
    class OptimizeDirectEntry implements BufferEntry {
1✔
644
      @Override
645
      public void runWith(Substream substream) {
646
        substream.stream.optimizeForDirectExecutor();
1✔
647
      }
1✔
648
    }
649

650
    delayOrExecute(new OptimizeDirectEntry());
1✔
651
  }
1✔
652

653
  @Override
654
  public final void setCompressor(final Compressor compressor) {
655
    class CompressorEntry implements BufferEntry {
1✔
656
      @Override
657
      public void runWith(Substream substream) {
658
        substream.stream.setCompressor(compressor);
1✔
659
      }
1✔
660
    }
661

662
    delayOrExecute(new CompressorEntry());
1✔
663
  }
1✔
664

665
  @Override
666
  public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
667
    class FullStreamDecompressionEntry implements BufferEntry {
1✔
668
      @Override
669
      public void runWith(Substream substream) {
670
        substream.stream.setFullStreamDecompression(fullStreamDecompression);
1✔
671
      }
1✔
672
    }
673

674
    delayOrExecute(new FullStreamDecompressionEntry());
1✔
675
  }
1✔
676

677
  @Override
678
  public final void setMessageCompression(final boolean enable) {
679
    class MessageCompressionEntry implements BufferEntry {
1✔
680
      @Override
681
      public void runWith(Substream substream) {
682
        substream.stream.setMessageCompression(enable);
1✔
683
      }
1✔
684
    }
685

686
    delayOrExecute(new MessageCompressionEntry());
1✔
687
  }
1✔
688

689
  @Override
690
  public final void halfClose() {
691
    class HalfCloseEntry implements BufferEntry {
1✔
692
      @Override
693
      public void runWith(Substream substream) {
694
        substream.stream.halfClose();
1✔
695
      }
1✔
696
    }
697

698
    delayOrExecute(new HalfCloseEntry());
1✔
699
  }
1✔
700

701
  @Override
702
  public final void setAuthority(final String authority) {
703
    class AuthorityEntry implements BufferEntry {
1✔
704
      @Override
705
      public void runWith(Substream substream) {
706
        substream.stream.setAuthority(authority);
1✔
707
      }
1✔
708
    }
709

710
    delayOrExecute(new AuthorityEntry());
1✔
711
  }
1✔
712

713
  @Override
714
  public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
715
    class DecompressorRegistryEntry implements BufferEntry {
1✔
716
      @Override
717
      public void runWith(Substream substream) {
718
        substream.stream.setDecompressorRegistry(decompressorRegistry);
1✔
719
      }
1✔
720
    }
721

722
    delayOrExecute(new DecompressorRegistryEntry());
1✔
723
  }
1✔
724

725
  @Override
726
  public final void setMaxInboundMessageSize(final int maxSize) {
727
    class MaxInboundMessageSizeEntry implements BufferEntry {
1✔
728
      @Override
729
      public void runWith(Substream substream) {
730
        substream.stream.setMaxInboundMessageSize(maxSize);
1✔
731
      }
1✔
732
    }
733

734
    delayOrExecute(new MaxInboundMessageSizeEntry());
1✔
735
  }
1✔
736

737
  @Override
738
  public final void setMaxOutboundMessageSize(final int maxSize) {
739
    class MaxOutboundMessageSizeEntry implements BufferEntry {
1✔
740
      @Override
741
      public void runWith(Substream substream) {
742
        substream.stream.setMaxOutboundMessageSize(maxSize);
1✔
743
      }
1✔
744
    }
745

746
    delayOrExecute(new MaxOutboundMessageSizeEntry());
1✔
747
  }
1✔
748

749
  @Override
750
  public final void setDeadline(final Deadline deadline) {
751
    class DeadlineEntry implements BufferEntry {
1✔
752
      @Override
753
      public void runWith(Substream substream) {
754
        substream.stream.setDeadline(deadline);
1✔
755
      }
1✔
756
    }
757

758
    delayOrExecute(new DeadlineEntry());
1✔
759
  }
1✔
760

761
  @Override
762
  public final Attributes getAttributes() {
763
    if (state.winningSubstream != null) {
1✔
764
      return state.winningSubstream.stream.getAttributes();
1✔
765
    }
766
    return Attributes.EMPTY;
×
767
  }
768

769
  @Override
770
  public void appendTimeoutInsight(InsightBuilder insight) {
771
    State currentState;
772
    synchronized (lock) {
1✔
773
      insight.appendKeyValue("closed", closedSubstreamsInsight);
1✔
774
      currentState = state;
1✔
775
    }
1✔
776
    if (currentState.winningSubstream != null) {
1✔
777
      // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
778
      // of the winning substream, they may not have received closed() notifications yet, thus they
779
      // may be missing from closedSubstreamsInsight.  This may be a little confusing to the user.
780
      // We need to figure out how to include them.
781
      InsightBuilder substreamInsight = new InsightBuilder();
1✔
782
      currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
1✔
783
      insight.appendKeyValue("committed", substreamInsight);
1✔
784
    } else {
1✔
785
      InsightBuilder openSubstreamsInsight = new InsightBuilder();
1✔
786
      // drainedSubstreams doesn't include all open substreams.  Those which have just been created
787
      // and are still catching up with buffered requests (in other words, still draining) will not
788
      // show up.  We think this is benign, because the draining should be typically fast, and it'd
789
      // be indistinguishable from the case where those streams are to be created a little late due
790
      // to delays in the timer.
791
      for (Substream sub : currentState.drainedSubstreams) {
1✔
792
        InsightBuilder substreamInsight = new InsightBuilder();
1✔
793
        sub.stream.appendTimeoutInsight(substreamInsight);
1✔
794
        openSubstreamsInsight.append(substreamInsight);
1✔
795
      }
1✔
796
      insight.appendKeyValue("open", openSubstreamsInsight);
1✔
797
    }
798
  }
1✔
799

800
  private static Random random = new Random();
1✔
801

802
  @VisibleForTesting
803
  static void setRandom(Random random) {
804
    RetriableStream.random = random;
1✔
805
  }
1✔
806

807
  /**
808
   * Whether there is any potential hedge at the moment. A false return value implies there is
809
   * absolutely no potential hedge. At least one of the hedges will observe a false return value
810
   * when calling this method, unless otherwise the rpc is committed.
811
   */
812
  // only called when isHedging is true
813
  @GuardedBy("lock")
814
  private boolean hasPotentialHedging(State state) {
815
    return state.winningSubstream == null
1✔
816
        && state.hedgingAttemptCount < hedgingPolicy.maxAttempts
817
        && !state.hedgingFrozen;
818
  }
819

820
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
821
  private void freezeHedging() {
822
    Future<?> futureToBeCancelled = null;
1✔
823
    synchronized (lock) {
1✔
824
      if (scheduledHedging != null) {
1✔
825
        futureToBeCancelled = scheduledHedging.markCancelled();
1✔
826
        scheduledHedging = null;
1✔
827
      }
828
      state = state.freezeHedging();
1✔
829
    }
1✔
830

831
    if (futureToBeCancelled != null) {
1✔
832
      futureToBeCancelled.cancel(false);
1✔
833
    }
834
  }
1✔
835

836
  private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
837
    savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
1✔
838
        metadata);
839
    if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
1✔
840
      listenerSerializeExecutor.execute(
1✔
841
          new Runnable() {
1✔
842
            @Override
843
            public void run() {
844
              isClosed = true;
1✔
845
              masterListener.closed(status, progress, metadata);
1✔
846
            }
1✔
847
          });
848
    }
849
  }
1✔
850

851
  private static final boolean isExperimentalRetryJitterEnabled = GrpcUtil
1✔
852
          .getFlag("GRPC_EXPERIMENTAL_XDS_RLS_LB", true);
1✔
853

854
  public static long intervalWithJitter(long intervalNanos) {
855
    double inverseJitterFactor = isExperimentalRetryJitterEnabled
1✔
856
            ? 0.8 * random.nextDouble() + 0.4 : random.nextDouble();
1✔
857
    return (long) (intervalNanos * inverseJitterFactor);
1✔
858
  }
859

860
  private static final class SavedCloseMasterListenerReason {
861
    private final Status status;
862
    private final RpcProgress progress;
863
    private final Metadata metadata;
864

865
    SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
1✔
866
      this.status = status;
1✔
867
      this.progress = progress;
1✔
868
      this.metadata = metadata;
1✔
869
    }
1✔
870
  }
871

872
  private interface BufferEntry {
873
    /** Replays the buffer entry with the given stream. */
874
    void runWith(Substream substream);
875
  }
876

877
  private final class Sublistener implements ClientStreamListener {
1✔
878
    final Substream substream;
879

880
    Sublistener(Substream substream) {
1✔
881
      this.substream = substream;
1✔
882
    }
1✔
883

884
    @Override
885
    public void headersRead(final Metadata headers) {
886
      if (substream.previousAttemptCount > 0) {
1✔
887
        headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
1✔
888
        headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
1✔
889
      }
890
      commitAndRun(substream);
1✔
891
      if (state.winningSubstream == substream) {
1✔
892
        if (throttle != null) {
1✔
893
          throttle.onSuccess();
1✔
894
        }
895
        listenerSerializeExecutor.execute(
1✔
896
            new Runnable() {
1✔
897
              @Override
898
              public void run() {
899
                masterListener.headersRead(headers);
1✔
900
              }
1✔
901
            });
902
      }
903
    }
1✔
904

905
    @Override
906
    public void closed(
907
        final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
908
      synchronized (lock) {
1✔
909
        state = state.substreamClosed(substream);
1✔
910
        closedSubstreamsInsight.append(status.getCode());
1✔
911
      }
1✔
912

913
      if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
914
        assert savedCloseMasterListenerReason != null;
1✔
915
        listenerSerializeExecutor.execute(
1✔
916
            new Runnable() {
1✔
917
              @Override
918
              public void run() {
919
                isClosed = true;
1✔
920
                masterListener.closed(savedCloseMasterListenerReason.status,
1✔
921
                    savedCloseMasterListenerReason.progress,
1✔
922
                    savedCloseMasterListenerReason.metadata);
1✔
923
              }
1✔
924
            });
925
        return;
1✔
926
      }
927

928
      // handle a race between buffer limit exceeded and closed, when setting
929
      // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
930
      if (substream.bufferLimitExceeded) {
1✔
931
        commitAndRun(substream);
1✔
932
        if (state.winningSubstream == substream) {
1✔
933
          safeCloseMasterListener(status, rpcProgress, trailers);
1✔
934
        }
935
        return;
1✔
936
      }
937
      if (rpcProgress == RpcProgress.MISCARRIED
1✔
938
          && localOnlyTransparentRetries.incrementAndGet() > 1_000) {
1✔
939
        commitAndRun(substream);
1✔
940
        if (state.winningSubstream == substream) {
1✔
941
          Status tooManyTransparentRetries = Status.INTERNAL
1✔
942
              .withDescription("Too many transparent retries. Might be a bug in gRPC")
1✔
943
              .withCause(status.asRuntimeException());
1✔
944
          safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
1✔
945
        }
946
        return;
1✔
947
      }
948

949
      if (state.winningSubstream == null) {
1✔
950
        if (rpcProgress == RpcProgress.MISCARRIED
1✔
951
            || (rpcProgress == RpcProgress.REFUSED
952
                && noMoreTransparentRetry.compareAndSet(false, true))) {
1✔
953
          // transparent retry
954
          final Substream newSubstream = createSubstream(substream.previousAttemptCount,
1✔
955
              true, false);
956
          if (newSubstream == null) {
1✔
957
            return;
×
958
          }
959
          if (isHedging) {
1✔
960
            synchronized (lock) {
1✔
961
              // Although this operation is not done atomically with
962
              // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of
963
              // activeHedges, so neither does it affect the commitment decision of other threads,
964
              // nor do the commitment decision making threads affect itself.
965
              state = state.replaceActiveHedge(substream, newSubstream);
1✔
966
            }
1✔
967
          }
968

969
          callExecutor.execute(new Runnable() {
1✔
970
            @Override
971
            public void run() {
972
              drain(newSubstream);
1✔
973
            }
1✔
974
          });
975
          return;
1✔
976
        } else if (rpcProgress == RpcProgress.DROPPED) {
1✔
977
          // For normal retry, nothing need be done here, will just commit.
978
          // For hedging, cancel scheduled hedge that is scheduled prior to the drop
979
          if (isHedging) {
1✔
980
            freezeHedging();
×
981
          }
982
        } else {
983
          noMoreTransparentRetry.set(true);
1✔
984

985
          if (isHedging) {
1✔
986
            HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
1✔
987
            if (hedgingPlan.isHedgeable) {
1✔
988
              pushbackHedging(hedgingPlan.hedgingPushbackMillis);
1✔
989
            }
990
            synchronized (lock) {
1✔
991
              state = state.removeActiveHedge(substream);
1✔
992
              // The invariant is whether or not #(Potential Hedge + active hedges) > 0.
993
              // Once hasPotentialHedging(state) is false, it will always be false, and then
994
              // #(state.activeHedges) will be decreasing. This guarantees that even there may be
995
              // multiple concurrent hedges, one of the hedges will end up committed.
996
              if (hedgingPlan.isHedgeable) {
1✔
997
                if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
1✔
998
                  return;
1✔
999
                }
1000
                // else, no activeHedges, no new hedges possible, try to commit
1001
              } // else, isHedgeable is false, try to commit
1002
            }
1✔
1003
          } else {
1✔
1004
            RetryPlan retryPlan = makeRetryDecision(status, trailers);
1✔
1005
            if (retryPlan.shouldRetry) {
1✔
1006
              // retry
1007
              Substream newSubstream = createSubstream(substream.previousAttemptCount + 1,
1✔
1008
                  false, false);
1009
              if (newSubstream == null) {
1✔
1010
                return;
×
1011
              }
1012
              // The check state.winningSubstream == null, checking if is not already committed, is
1013
              // racy, but is still safe b/c the retry will also handle committed/cancellation
1014
              FutureCanceller scheduledRetryCopy;
1015
              synchronized (lock) {
1✔
1016
                scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
1✔
1017
              }
1✔
1018

1019
              class RetryBackoffRunnable implements Runnable {
1✔
1020
                @Override
1021
                @SuppressWarnings("FutureReturnValueIgnored")
1022
                public void run() {
1023
                  synchronized (scheduledRetryCopy.lock) {
1✔
1024
                    if (scheduledRetryCopy.isCancelled()) {
1✔
1025
                      return;
×
1026
                    } else {
1027
                      scheduledRetryCopy.markCancelled();
1✔
1028
                    }
1029
                  }
1✔
1030

1031
                  callExecutor.execute(
1✔
1032
                      new Runnable() {
1✔
1033
                        @Override
1034
                        public void run() {
1035
                          drain(newSubstream);
1✔
1036
                        }
1✔
1037
                      });
1038
                }
1✔
1039
              }
1040

1041
              scheduledRetryCopy.setFuture(
1✔
1042
                  scheduledExecutorService.schedule(
1✔
1043
                      new RetryBackoffRunnable(),
1044
                      retryPlan.backoffNanos,
1045
                      TimeUnit.NANOSECONDS));
1046
              return;
1✔
1047
            }
1048
          }
1049
        }
1050
      }
1051

1052
      commitAndRun(substream);
1✔
1053
      if (state.winningSubstream == substream) {
1✔
1054
        safeCloseMasterListener(status, rpcProgress, trailers);
1✔
1055
      }
1056
    }
1✔
1057

1058
    /**
1059
     * Decides in current situation whether or not the RPC should retry and if it should retry how
1060
     * long the backoff should be. The decision does not take the commitment status into account, so
1061
     * caller should check it separately. It also updates the throttle. It does not change state.
1062
     */
1063
    private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
1064
      if (retryPolicy == null) {
1✔
1065
        return new RetryPlan(false, 0);
1✔
1066
      }
1067
      boolean shouldRetry = false;
1✔
1068
      long backoffNanos = 0L;
1✔
1069
      boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
1✔
1070
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1071
      boolean isThrottled = false;
1✔
1072
      if (throttle != null) {
1✔
1073
        if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1074
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1075
        }
1076
      }
1077

1078
      if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
1✔
1079
        if (pushbackMillis == null) {
1✔
1080
          if (isRetryableStatusCode) {
1✔
1081
            shouldRetry = true;
1✔
1082
            backoffNanos = intervalWithJitter(nextBackoffIntervalNanos);
1✔
1083
            nextBackoffIntervalNanos = Math.min(
1✔
1084
                (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
1✔
1085
                retryPolicy.maxBackoffNanos);
1✔
1086
          } // else no retry
1087
        } else if (pushbackMillis >= 0) {
1✔
1088
          shouldRetry = true;
1✔
1089
          backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
1✔
1090
          nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
1091
        } // else no retry
1092
      } // else no retry
1093

1094
      return new RetryPlan(shouldRetry, backoffNanos);
1✔
1095
    }
1096

1097
    private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
1098
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1099
      boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
1✔
1100
      boolean isThrottled = false;
1✔
1101
      if (throttle != null) {
1✔
1102
        if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1103
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1104
        }
1105
      }
1106
      if (!isFatal && !isThrottled && !status.isOk()
1✔
1107
          && (pushbackMillis != null && pushbackMillis > 0)) {
1✔
1108
        pushbackMillis = 0; // We want the retry after a nonfatal error to be immediate
1✔
1109
      }
1110
      return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
1✔
1111
    }
1112

1113
    @Nullable
1114
    private Integer getPushbackMills(Metadata trailer) {
1115
      String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
1✔
1116
      Integer pushbackMillis = null;
1✔
1117
      if (pushbackStr != null) {
1✔
1118
        try {
1119
          pushbackMillis = Integer.valueOf(pushbackStr);
1✔
1120
        } catch (NumberFormatException e) {
1✔
1121
          pushbackMillis = -1;
1✔
1122
        }
1✔
1123
      }
1124
      return pushbackMillis;
1✔
1125
    }
1126

1127
    @Override
1128
    public void messagesAvailable(final MessageProducer producer) {
1129
      State savedState = state;
1✔
1130
      checkState(
1✔
1131
          savedState.winningSubstream != null, "Headers should be received prior to messages.");
1132
      if (savedState.winningSubstream != substream) {
1✔
1133
        GrpcUtil.closeQuietly(producer);
1✔
1134
        return;
1✔
1135
      }
1136
      listenerSerializeExecutor.execute(
1✔
1137
          new Runnable() {
1✔
1138
            @Override
1139
            public void run() {
1140
              masterListener.messagesAvailable(producer);
1✔
1141
            }
1✔
1142
          });
1143
    }
1✔
1144

1145
    @Override
1146
    public void onReady() {
1147
      // FIXME(#7089): hedging case is broken.
1148
      if (!isReady()) {
1✔
1149
        return;
1✔
1150
      }
1151
      listenerSerializeExecutor.execute(
1✔
1152
          new Runnable() {
1✔
1153
            @Override
1154
            public void run() {
1155
              if (!isClosed) {
1✔
1156
                masterListener.onReady();
1✔
1157
              }
1158
            }
1✔
1159
          });
1160
    }
1✔
1161
  }
1162

1163
  private static final class State {
1164
    /** Committed and the winning substream drained. */
1165
    final boolean passThrough;
1166

1167
    /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
1168
    @Nullable final List<BufferEntry> buffer;
1169

1170
    /**
1171
     * Unmodifiable collection of all the open substreams that are drained. Singleton once
1172
     * passThrough; Empty if committed but not passTrough.
1173
     */
1174
    final Collection<Substream> drainedSubstreams;
1175

1176
    /**
1177
     * Unmodifiable collection of all the active hedging substreams.
1178
     *
1179
     * <p>A substream even with the attribute substream.closed being true may be considered still
1180
     * "active" at the moment as long as it is in this collection.
1181
     */
1182
    final Collection<Substream> activeHedges; // not null once isHedging = true
1183

1184
    final int hedgingAttemptCount;
1185

1186
    /** Null until committed. */
1187
    @Nullable final Substream winningSubstream;
1188

1189
    /** Not required to set to true when cancelled, but can short-circuit the draining process. */
1190
    final boolean cancelled;
1191

1192
    /** No more hedging due to events like drop or pushback. */
1193
    final boolean hedgingFrozen;
1194

1195
    State(
1196
        @Nullable List<BufferEntry> buffer,
1197
        Collection<Substream> drainedSubstreams,
1198
        Collection<Substream> activeHedges,
1199
        @Nullable Substream winningSubstream,
1200
        boolean cancelled,
1201
        boolean passThrough,
1202
        boolean hedgingFrozen,
1203
        int hedgingAttemptCount) {
1✔
1204
      this.buffer = buffer;
1✔
1205
      this.drainedSubstreams =
1✔
1206
          checkNotNull(drainedSubstreams, "drainedSubstreams");
1✔
1207
      this.winningSubstream = winningSubstream;
1✔
1208
      this.activeHedges = activeHedges;
1✔
1209
      this.cancelled = cancelled;
1✔
1210
      this.passThrough = passThrough;
1✔
1211
      this.hedgingFrozen = hedgingFrozen;
1✔
1212
      this.hedgingAttemptCount = hedgingAttemptCount;
1✔
1213

1214
      checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
1✔
1215
      checkState(
1✔
1216
          !passThrough || winningSubstream != null,
1217
          "passThrough should imply winningSubstream != null");
1218
      checkState(
1✔
1219
          !passThrough
1220
              || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
1✔
1221
              || (drainedSubstreams.size() == 0 && winningSubstream.closed),
1✔
1222
          "passThrough should imply winningSubstream is drained");
1223
      checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
1✔
1224
    }
1✔
1225

1226
    @CheckReturnValue
1227
    // GuardedBy RetriableStream.lock
1228
    State cancelled() {
1229
      return new State(
1✔
1230
          buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough,
1231
          hedgingFrozen, hedgingAttemptCount);
1232
    }
1233

1234
    /** The given substream is drained. */
1235
    @CheckReturnValue
1236
    // GuardedBy RetriableStream.lock
1237
    State substreamDrained(Substream substream) {
1238
      checkState(!passThrough, "Already passThrough");
1✔
1239

1240
      Collection<Substream> drainedSubstreams;
1241
      
1242
      if (substream.closed) {
1✔
1243
        drainedSubstreams = this.drainedSubstreams;
1✔
1244
      } else if (this.drainedSubstreams.isEmpty()) {
1✔
1245
        // optimize for 0-retry, which is most of the cases.
1246
        drainedSubstreams = Collections.singletonList(substream);
1✔
1247
      } else {
1248
        drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1249
        drainedSubstreams.add(substream);
1✔
1250
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1251
      }
1252

1253
      boolean passThrough = winningSubstream != null;
1✔
1254

1255
      List<BufferEntry> buffer = this.buffer;
1✔
1256
      if (passThrough) {
1✔
1257
        checkState(
1✔
1258
            winningSubstream == substream, "Another RPC attempt has already committed");
1259
        buffer = null;
1✔
1260
      }
1261

1262
      return new State(
1✔
1263
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1264
          hedgingFrozen, hedgingAttemptCount);
1265
    }
1266

1267
    /** The given substream is closed. */
1268
    @CheckReturnValue
1269
    // GuardedBy RetriableStream.lock
1270
    State substreamClosed(Substream substream) {
1271
      substream.closed = true;
1✔
1272
      if (this.drainedSubstreams.contains(substream)) {
1✔
1273
        Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1274
        drainedSubstreams.remove(substream);
1✔
1275
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1276
        return new State(
1✔
1277
            buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1278
            hedgingFrozen, hedgingAttemptCount);
1279
      } else {
1280
        return this;
1✔
1281
      }
1282
    }
1283

1284
    @CheckReturnValue
1285
    // GuardedBy RetriableStream.lock
1286
    State committed(Substream winningSubstream) {
1287
      checkState(this.winningSubstream == null, "Already committed");
1✔
1288

1289
      boolean passThrough = false;
1✔
1290
      List<BufferEntry> buffer = this.buffer;
1✔
1291
      Collection<Substream> drainedSubstreams;
1292

1293
      if (this.drainedSubstreams.contains(winningSubstream)) {
1✔
1294
        passThrough = true;
1✔
1295
        buffer = null;
1✔
1296
        drainedSubstreams = Collections.singleton(winningSubstream);
1✔
1297
      } else {
1298
        drainedSubstreams = Collections.emptyList();
1✔
1299
      }
1300

1301
      return new State(
1✔
1302
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1303
          hedgingFrozen, hedgingAttemptCount);
1304
    }
1305

1306
    @CheckReturnValue
1307
    // GuardedBy RetriableStream.lock
1308
    State freezeHedging() {
1309
      if (hedgingFrozen) {
1✔
1310
        return this;
×
1311
      }
1312
      return new State(
1✔
1313
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1314
          true, hedgingAttemptCount);
1315
    }
1316

1317
    @CheckReturnValue
1318
    // GuardedBy RetriableStream.lock
1319
    // state.hedgingAttemptCount is modified only here.
1320
    // The method is only called in RetriableStream.start() and HedgingRunnable.run()
1321
    State addActiveHedge(Substream substream) {
1322
      // hasPotentialHedging must be true
1323
      checkState(!hedgingFrozen, "hedging frozen");
1✔
1324
      checkState(winningSubstream == null, "already committed");
1✔
1325

1326
      Collection<Substream> activeHedges;
1327
      if (this.activeHedges == null) {
1✔
1328
        activeHedges = Collections.singleton(substream);
1✔
1329
      } else {
1330
        activeHedges = new ArrayList<>(this.activeHedges);
1✔
1331
        activeHedges.add(substream);
1✔
1332
        activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1333
      }
1334

1335
      int hedgingAttemptCount = this.hedgingAttemptCount + 1;
1✔
1336
      return new State(
1✔
1337
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1338
          hedgingFrozen, hedgingAttemptCount);
1339
    }
1340

1341
    @CheckReturnValue
1342
    // GuardedBy RetriableStream.lock
1343
    // The method is only called in Sublistener.closed()
1344
    State removeActiveHedge(Substream substream) {
1345
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1346
      activeHedges.remove(substream);
1✔
1347
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1348

1349
      return new State(
1✔
1350
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1351
          hedgingFrozen, hedgingAttemptCount);
1352
    }
1353

1354
    @CheckReturnValue
1355
    // GuardedBy RetriableStream.lock
1356
    // The method is only called for transparent retry.
1357
    State replaceActiveHedge(Substream oldOne, Substream newOne) {
1358
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1359
      activeHedges.remove(oldOne);
1✔
1360
      activeHedges.add(newOne);
1✔
1361
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1362

1363
      return new State(
1✔
1364
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1365
          hedgingFrozen, hedgingAttemptCount);
1366
    }
1367
  }
1368

1369
  /**
1370
   * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
1371
   *  attributes.
1372
   */
1373
  private static final class Substream {
1374
    ClientStream stream;
1375

1376
    // GuardedBy RetriableStream.lock
1377
    boolean closed;
1378

1379
    // setting to true must be GuardedBy RetriableStream.lock
1380
    boolean bufferLimitExceeded;
1381

1382
    final int previousAttemptCount;
1383

1384
    Substream(int previousAttemptCount) {
1✔
1385
      this.previousAttemptCount = previousAttemptCount;
1✔
1386
    }
1✔
1387
  }
1388

1389

1390
  /**
1391
   * Traces the buffer used by a substream.
1392
   */
1393
  class BufferSizeTracer extends ClientStreamTracer {
1394
    // Each buffer size tracer is dedicated to one specific substream.
1395
    private final Substream substream;
1396

1397
    @GuardedBy("lock")
1398
    long bufferNeeded;
1399

1400
    BufferSizeTracer(Substream substream) {
1✔
1401
      this.substream = substream;
1✔
1402
    }
1✔
1403

1404
    /**
1405
     * A message is sent to the wire, so its reference would be released if no retry or
1406
     * hedging were involved. So at this point we have to hold the reference of the message longer
1407
     * for retry, and we need to increment {@code substream.bufferNeeded}.
1408
     */
1409
    @Override
1410
    public void outboundWireSize(long bytes) {
1411
      if (state.winningSubstream != null) {
1✔
1412
        return;
1✔
1413
      }
1414

1415
      Runnable postCommitTask = null;
1✔
1416

1417
      // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
1418
      synchronized (lock) {
1✔
1419
        if (state.winningSubstream != null || substream.closed) {
1✔
1420
          return;
×
1421
        }
1422
        bufferNeeded += bytes;
1✔
1423
        if (bufferNeeded <= perRpcBufferUsed) {
1✔
1424
          return;
1✔
1425
        }
1426

1427
        if (bufferNeeded > perRpcBufferLimit) {
1✔
1428
          substream.bufferLimitExceeded = true;
1✔
1429
        } else {
1430
          // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
1431
          long savedChannelBufferUsed =
1✔
1432
              channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
1✔
1433
          perRpcBufferUsed = bufferNeeded;
1✔
1434

1435
          if (savedChannelBufferUsed > channelBufferLimit) {
1✔
1436
            substream.bufferLimitExceeded = true;
1✔
1437
          }
1438
        }
1439

1440
        if (substream.bufferLimitExceeded) {
1✔
1441
          postCommitTask = commit(substream);
1✔
1442
        }
1443
      }
1✔
1444

1445
      if (postCommitTask != null) {
1✔
1446
        postCommitTask.run();
1✔
1447
      }
1448
    }
1✔
1449
  }
1450

1451
  /**
1452
   *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
1453
   *  the Channel. There should be a single instance of it for each channel.
1454
   */
1455
  static final class ChannelBufferMeter {
1✔
1456
    private final AtomicLong bufferUsed = new AtomicLong();
1✔
1457

1458
    @VisibleForTesting
1459
    long addAndGet(long newBytesUsed) {
1460
      return bufferUsed.addAndGet(newBytesUsed);
1✔
1461
    }
1462
  }
1463

1464
  /**
1465
   * Used for retry throttling.
1466
   */
1467
  static final class Throttle {
1468

1469
    private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
1470

1471
    /**
1472
     * 1000 times the maxTokens field of the retryThrottling policy in service config.
1473
     * The number of tokens starts at maxTokens. The token_count will always be between 0 and
1474
     * maxTokens.
1475
     */
1476
    final int maxTokens;
1477

1478
    /**
1479
     * Half of {@code maxTokens}.
1480
     */
1481
    final int threshold;
1482

1483
    /**
1484
     * 1000 times the tokenRatio field of the retryThrottling policy in service config.
1485
     */
1486
    final int tokenRatio;
1487

1488
    final AtomicInteger tokenCount = new AtomicInteger();
1✔
1489

1490
    Throttle(float maxTokens, float tokenRatio) {
1✔
1491
      // tokenRatio is up to 3 decimal places
1492
      this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1493
      this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1494
      this.threshold = this.maxTokens / 2;
1✔
1495
      tokenCount.set(this.maxTokens);
1✔
1496
    }
1✔
1497

1498
    @VisibleForTesting
1499
    boolean isAboveThreshold() {
1500
      return tokenCount.get() > threshold;
1✔
1501
    }
1502

1503
    /**
1504
     * Counts down the token on qualified failure and checks if it is above the threshold
1505
     * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
1506
     * a not-to-retry pushback.
1507
     */
1508
    @VisibleForTesting
1509
    boolean onQualifiedFailureThenCheckIsAboveThreshold() {
1510
      while (true) {
1511
        int currentCount = tokenCount.get();
1✔
1512
        if (currentCount == 0) {
1✔
1513
          return false;
1✔
1514
        }
1515
        int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1516
        boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
1✔
1517
        if (updated) {
1✔
1518
          return decremented > threshold;
1✔
1519
        }
1520
      }
×
1521
    }
1522

1523
    @VisibleForTesting
1524
    void onSuccess() {
1525
      while (true) {
1526
        int currentCount = tokenCount.get();
1✔
1527
        if (currentCount == maxTokens) {
1✔
1528
          break;
1✔
1529
        }
1530
        int incremented = currentCount + tokenRatio;
1✔
1531
        boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
1✔
1532
        if (updated) {
1✔
1533
          break;
1✔
1534
        }
1535
      }
×
1536
    }
1✔
1537

1538
    @Override
1539
    public boolean equals(Object o) {
1540
      if (this == o) {
1✔
1541
        return true;
×
1542
      }
1543
      if (!(o instanceof Throttle)) {
1✔
1544
        return false;
×
1545
      }
1546
      Throttle that = (Throttle) o;
1✔
1547
      return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
1✔
1548
    }
1549

1550
    @Override
1551
    public int hashCode() {
1552
      return Objects.hashCode(maxTokens, tokenRatio);
×
1553
    }
1554
  }
1555

1556
  private static final class RetryPlan {
1557
    final boolean shouldRetry;
1558
    final long backoffNanos;
1559

1560
    RetryPlan(boolean shouldRetry, long backoffNanos) {
1✔
1561
      this.shouldRetry = shouldRetry;
1✔
1562
      this.backoffNanos = backoffNanos;
1✔
1563
    }
1✔
1564
  }
1565

1566
  private static final class HedgingPlan {
1567
    final boolean isHedgeable;
1568
    @Nullable
1569
    final Integer hedgingPushbackMillis;
1570

1571
    public HedgingPlan(
1572
        boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
1✔
1573
      this.isHedgeable = isHedgeable;
1✔
1574
      this.hedgingPushbackMillis = hedgingPushbackMillis;
1✔
1575
    }
1✔
1576
  }
1577

1578
  /** Allows cancelling a Future without racing with setting the future. */
1579
  private static final class FutureCanceller {
1580

1581
    final Object lock;
1582
    @GuardedBy("lock")
1583
    Future<?> future;
1584
    @GuardedBy("lock")
1585
    boolean cancelled;
1586

1587
    FutureCanceller(Object lock) {
1✔
1588
      this.lock = lock;
1✔
1589
    }
1✔
1590

1591
    void setFuture(Future<?> future) {
1592
      boolean wasCancelled;
1593
      synchronized (lock) {
1✔
1594
        wasCancelled = cancelled;
1✔
1595
        if (!wasCancelled) {
1✔
1596
          this.future = future;
1✔
1597
        }
1598
      }
1✔
1599
      if (wasCancelled) {
1✔
1600
        future.cancel(false);
×
1601
      }
1602
    }
1✔
1603

1604
    @GuardedBy("lock")
1605
    @CheckForNull // Must cancel the returned future if not null.
1606
    Future<?> markCancelled() {
1607
      cancelled = true;
1✔
1608
      return future;
1✔
1609
    }
1610

1611
    @GuardedBy("lock")
1612
    boolean isCancelled() {
1613
      return cancelled;
1✔
1614
    }
1615
  }
1616
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc