• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20098

26 Nov 2025 09:46PM UTC coverage: 88.572% (-0.04%) from 88.614%
#20098

push

github

ejona86
core: Fix shutdown failing accepted RPCs

This fixes a race where RPCs could fail with "UNAVAILABLE: Channel
shutdown invoked" even though they were created before
channel.shutdown().

This basically adopts the internalStart() logic from DelayedStream,
although the stream is a bit different because it has APIs that can be
called before start() and doesn't need to handle cancel() without
start().

The ManagedChannelImpltest had the number of due tasks increase because
start() running earlier creates a DelayedStream. Previously the stream
wasn't created until runDueTasks() so the mockPicker had already been
installed and it could use a real stream from the beginning. But that's
specific to the test; in practice it'd be a delayed stream before and
after this change.

See #12536

35109 of 39639 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.05
/../core/src/main/java/io/grpc/internal/RetriableStream.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Objects;
25
import com.google.errorprone.annotations.CheckReturnValue;
26
import com.google.errorprone.annotations.concurrent.GuardedBy;
27
import io.grpc.Attributes;
28
import io.grpc.ClientStreamTracer;
29
import io.grpc.Compressor;
30
import io.grpc.Deadline;
31
import io.grpc.DecompressorRegistry;
32
import io.grpc.Metadata;
33
import io.grpc.MethodDescriptor;
34
import io.grpc.Status;
35
import io.grpc.SynchronizationContext;
36
import io.grpc.internal.ClientStreamListener.RpcProgress;
37
import java.io.InputStream;
38
import java.lang.Thread.UncaughtExceptionHandler;
39
import java.util.ArrayList;
40
import java.util.Collection;
41
import java.util.Collections;
42
import java.util.List;
43
import java.util.Random;
44
import java.util.concurrent.Executor;
45
import java.util.concurrent.Future;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import java.util.concurrent.atomic.AtomicBoolean;
49
import java.util.concurrent.atomic.AtomicInteger;
50
import java.util.concurrent.atomic.AtomicLong;
51
import javax.annotation.CheckForNull;
52
import javax.annotation.Nullable;
53

54
/** A logical {@link ClientStream} that is retriable. */
55
abstract class RetriableStream<ReqT> implements ClientStream {
56
  @VisibleForTesting
57
  static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
1✔
58
      Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
1✔
59

60
  @VisibleForTesting
61
  static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
1✔
62
      Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
1✔
63

64
  private static final Status CANCELLED_BECAUSE_COMMITTED =
1✔
65
      Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
1✔
66

67
  private final MethodDescriptor<ReqT, ?> method;
68
  private final Executor callExecutor;
69
  private final Executor listenerSerializeExecutor = new SynchronizationContext(
1✔
70
      new UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          throw Status.fromThrowable(e)
×
74
              .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
×
75
              .asRuntimeException();
×
76
        }
77
      }
78
  );
79
  private final ScheduledExecutorService scheduledExecutorService;
80
  // Must not modify it.
81
  private final Metadata headers;
82
  @Nullable
83
  private final RetryPolicy retryPolicy;
84
  @Nullable
85
  private final HedgingPolicy hedgingPolicy;
86
  private final boolean isHedging;
87

88
  /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
89
  private final Object lock = new Object();
1✔
90

91
  private final ChannelBufferMeter channelBufferUsed;
92
  private final long perRpcBufferLimit;
93
  private final long channelBufferLimit;
94
  @Nullable
95
  private final Throttle throttle;
96
  @GuardedBy("lock")
1✔
97
  private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
98

99
  private volatile State state = new State(
1✔
100
      new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
1✔
101
      false, 0);
102

103
  /**
104
   * Either non-local transparent retry happened or reached server's application logic.
105
   *
106
   * <p>Note that local-only transparent retries are unlimited.
107
   */
108
  private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
1✔
109
  private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
1✔
110
  private final AtomicInteger inFlightSubStreams = new AtomicInteger();
1✔
111
  private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112

113
  // Used for recording the share of buffer used for the current call out of the channel buffer.
114
  // This field would not be necessary if there is no channel buffer limit.
115
  @GuardedBy("lock")
116
  private long perRpcBufferUsed;
117

118
  private ClientStreamListener masterListener;
119
  @GuardedBy("lock")
120
  private FutureCanceller scheduledRetry;
121
  @GuardedBy("lock")
122
  private FutureCanceller scheduledHedging;
123
  private long nextBackoffIntervalNanos;
124
  private Status cancellationStatus;
125
  private boolean isClosed;
126

127
  RetriableStream(
128
      MethodDescriptor<ReqT, ?> method, Metadata headers,
129
      ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
130
      Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
131
      @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy,
132
      @Nullable Throttle throttle) {
1✔
133
    this.method = method;
1✔
134
    this.channelBufferUsed = channelBufferUsed;
1✔
135
    this.perRpcBufferLimit = perRpcBufferLimit;
1✔
136
    this.channelBufferLimit = channelBufferLimit;
1✔
137
    this.callExecutor = callExecutor;
1✔
138
    this.scheduledExecutorService = scheduledExecutorService;
1✔
139
    this.headers = headers;
1✔
140
    this.retryPolicy = retryPolicy;
1✔
141
    if (retryPolicy != null) {
1✔
142
      this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
143
    }
144
    this.hedgingPolicy = hedgingPolicy;
1✔
145
    checkArgument(
1✔
146
        retryPolicy == null || hedgingPolicy == null,
147
        "Should not provide both retryPolicy and hedgingPolicy");
148
    this.isHedging = hedgingPolicy != null;
1✔
149
    this.throttle = throttle;
1✔
150
  }
1✔
151

152
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153
  @Nullable // null if already committed
154
  @CheckReturnValue
155
  private Runnable commit(final Substream winningSubstream) {
156
    synchronized (lock) {
1✔
157
      if (state.winningSubstream != null) {
1✔
158
        return null;
1✔
159
      }
160
      final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
1✔
161

162
      state = state.committed(winningSubstream);
1✔
163

164
      // subtract the share of this RPC from channelBufferUsed.
165
      channelBufferUsed.addAndGet(-perRpcBufferUsed);
1✔
166

167
      final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
1✔
168
      final Future<?> retryFuture;
169
      if (scheduledRetry != null) {
1✔
170
        retryFuture = scheduledRetry.markCancelled();
1✔
171
        scheduledRetry = null;
1✔
172
      } else {
173
        retryFuture = null;
1✔
174
      }
175
      // cancel the scheduled hedging if it is scheduled prior to the commitment
176
      final Future<?> hedgingFuture;
177
      if (scheduledHedging != null) {
1✔
178
        hedgingFuture = scheduledHedging.markCancelled();
1✔
179
        scheduledHedging = null;
1✔
180
      } else {
181
        hedgingFuture = null;
1✔
182
      }
183

184
      class CommitTask implements Runnable {
1✔
185
        @Override
186
        public void run() {
187
          // For hedging only, not needed for normal retry
188
          for (Substream substream : savedDrainedSubstreams) {
1✔
189
            if (substream != winningSubstream) {
1✔
190
              substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
1✔
191
            }
192
          }
1✔
193
          if (retryFuture != null) {
1✔
194
            retryFuture.cancel(false);
1✔
195
            if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
196
              assert savedCloseMasterListenerReason != null;
×
197
              listenerSerializeExecutor.execute(
×
198
                  new Runnable() {
×
199
                    @Override
200
                    public void run() {
201
                      isClosed = true;
×
202
                      masterListener.closed(savedCloseMasterListenerReason.status,
×
203
                          savedCloseMasterListenerReason.progress,
×
204
                          savedCloseMasterListenerReason.metadata);
×
205
                    }
×
206
                  });
207
            }
208
          }
209

210
          if (hedgingFuture != null) {
1✔
211
            hedgingFuture.cancel(false);
1✔
212
          }
213

214
          postCommit();
1✔
215
        }
1✔
216
      }
217

218
      return new CommitTask();
1✔
219
    }
220
  }
221

222
  abstract void postCommit();
223

224
  /**
225
   * Calls commit() and if successful runs the post commit task. Post commit task will be non-null
226
   * for only once. The post commit task cancels other non-winning streams on separate transport
227
   * threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream
228
   * transports.(issues/10314)
229
   * This method should be called only in subListener callbacks. This guarantees callExecutor
230
   * schedules tasks before master listener closes, which is protected by the inFlightSubStreams
231
   * decorative. That is because:
232
   * For a successful winning stream, other streams won't attempt to close master listener.
233
   * For a cancelled winning stream (noop), other stream won't attempt to close master listener.
234
   * For a failed/closed winning stream, the last closed stream closes the master listener, and
235
   * callExecutor scheduling happens-before that.
236
   */
237
  private void commitAndRun(Substream winningSubstream) {
238
    Runnable postCommitTask = commit(winningSubstream);
1✔
239

240
    if (postCommitTask != null) {
1✔
241
      callExecutor.execute(postCommitTask);
1✔
242
    }
243
  }
1✔
244

245
  // returns null means we should not create new sub streams, e.g. cancelled or
246
  // other close condition is met for retriableStream.
247
  @Nullable
248
  private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry,
249
                                    boolean isHedgedStream) {
250
    int inFlight;
251
    do {
252
      inFlight = inFlightSubStreams.get();
1✔
253
      if (inFlight < 0) {
1✔
254
        return null;
×
255
      }
256
    } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1));
1✔
257
    Substream sub = new Substream(previousAttemptCount);
1✔
258
    // one tracer per substream
259
    final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
1✔
260
    ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
1✔
261
      @Override
262
      public ClientStreamTracer newClientStreamTracer(
263
          ClientStreamTracer.StreamInfo info, Metadata headers) {
264
        return bufferSizeTracer;
1✔
265
      }
266
    };
267

268
    Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
1✔
269
    // NOTICE: This set _must_ be done before stream.start() and it actually is.
270
    sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry,
1✔
271
        isHedgedStream);
272
    return sub;
1✔
273
  }
274

275
  /**
276
   * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
277
   * Client stream is not yet started.
278
   */
279
  abstract ClientStream newSubstream(
280
      Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
281
      boolean isTransparentRetry, boolean isHedgedStream);
282

283
  /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
284
  @VisibleForTesting
285
  final Metadata updateHeaders(
286
      Metadata originalHeaders, int previousAttemptCount) {
287
    Metadata newHeaders = new Metadata();
1✔
288
    newHeaders.merge(originalHeaders);
1✔
289
    if (previousAttemptCount > 0) {
1✔
290
      newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount));
1✔
291
    }
292
    return newHeaders;
1✔
293
  }
294

295
  private void drain(Substream substream) {
296
    int index = 0;
1✔
297
    int chunk = 0x80;
1✔
298
    List<BufferEntry> list = null;
1✔
299
    boolean streamStarted = false;
1✔
300
    Runnable onReadyRunnable = null;
1✔
301

302
    while (true) {
303
      State savedState;
304

305
      synchronized (lock) {
1✔
306
        savedState = state;
1✔
307
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
308
          // committed but not me, to be cancelled
309
          break;
1✔
310
        }
311
        if (savedState.cancelled) {
1✔
312
          break;
1✔
313
        }
314
        if (index == savedState.buffer.size()) { // I'm drained
1✔
315
          state = savedState.substreamDrained(substream);
1✔
316
          if (!isReady()) {
1✔
317
            return;
1✔
318
          }
319
          onReadyRunnable = new Runnable() {
1✔
320
            @Override
321
            public void run() {
322
              if (!isClosed) {
1✔
323
                masterListener.onReady();
1✔
324
              }
325
            }
1✔
326
          };
327
          break;
1✔
328
        }
329

330
        if (substream.closed) {
1✔
331
          return;
×
332
        }
333

334
        int stop = Math.min(index + chunk, savedState.buffer.size());
1✔
335
        if (list == null) {
1✔
336
          list = new ArrayList<>(savedState.buffer.subList(index, stop));
1✔
337
        } else {
338
          list.clear();
1✔
339
          list.addAll(savedState.buffer.subList(index, stop));
1✔
340
        }
341
        index = stop;
1✔
342
      }
1✔
343

344
      for (BufferEntry bufferEntry : list) {
1✔
345
        bufferEntry.runWith(substream);
1✔
346
        if (bufferEntry instanceof RetriableStream.StartEntry) {
1✔
347
          streamStarted = true;
1✔
348
        }
349
        savedState = state;
1✔
350
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
351
          // committed but not me, to be cancelled
352
          break;
1✔
353
        }
354
        if (savedState.cancelled) {
1✔
355
          break;
1✔
356
        }
357
      }
1✔
358
    }
1✔
359

360
    if (onReadyRunnable != null) {
1✔
361
      listenerSerializeExecutor.execute(onReadyRunnable);
1✔
362
      return;
1✔
363
    }
364

365
    if (!streamStarted) {
1✔
366
      // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
367
      substream.stream.start(new Sublistener(substream));
1✔
368
    }
369
    substream.stream.cancel(
1✔
370
        state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
1✔
371
  }
1✔
372

373
  /**
374
   * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
375
   */
376
  @CheckReturnValue
377
  @Nullable
378
  abstract Status prestart();
379

380
  class StartEntry implements BufferEntry {
1✔
381
    @Override
382
    public void runWith(Substream substream) {
383
      substream.stream.start(new Sublistener(substream));
1✔
384
    }
1✔
385
  }
386

387
  /** Starts the first RPC attempt. */
388
  @Override
389
  public final void start(ClientStreamListener listener) {
390
    masterListener = listener;
1✔
391

392
    Status shutdownStatus = prestart();
1✔
393

394
    if (shutdownStatus != null) {
1✔
395
      cancel(shutdownStatus);
1✔
396
      return;
1✔
397
    }
398

399
    synchronized (lock) {
1✔
400
      state.buffer.add(new StartEntry());
1✔
401
    }
1✔
402

403
    Substream substream = createSubstream(0, false, false);
1✔
404
    if (substream == null) {
1✔
405
      return;
×
406
    }
407
    if (isHedging) {
1✔
408
      FutureCanceller scheduledHedgingRef = null;
1✔
409

410
      synchronized (lock) {
1✔
411
        state = state.addActiveHedge(substream);
1✔
412
        if (hasPotentialHedging(state)
1✔
413
            && (throttle == null || throttle.isAboveThreshold())) {
1✔
414
          scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
1✔
415
        }
416
      }
1✔
417

418
      if (scheduledHedgingRef != null) {
1✔
419
        scheduledHedgingRef.setFuture(
1✔
420
            scheduledExecutorService.schedule(
1✔
421
                new HedgingRunnable(scheduledHedgingRef),
422
                hedgingPolicy.hedgingDelayNanos,
423
                TimeUnit.NANOSECONDS));
424
      }
425
    }
426

427
    drain(substream);
1✔
428
  }
1✔
429

430
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
431
  private void pushbackHedging(@Nullable Integer delayMillis) {
432
    if (delayMillis == null) {
1✔
433
      return;
1✔
434
    }
435
    if (delayMillis < 0) {
1✔
436
      freezeHedging();
1✔
437
      return;
1✔
438
    }
439

440
    // Cancels the current scheduledHedging and reschedules a new one.
441
    FutureCanceller future;
442
    Future<?> futureToBeCancelled;
443

444
    synchronized (lock) {
1✔
445
      if (scheduledHedging == null) {
1✔
446
        return;
×
447
      }
448

449
      futureToBeCancelled = scheduledHedging.markCancelled();
1✔
450
      scheduledHedging = future = new FutureCanceller(lock);
1✔
451
    }
1✔
452

453
    if (futureToBeCancelled != null) {
1✔
454
      futureToBeCancelled.cancel(false);
1✔
455
    }
456
    future.setFuture(scheduledExecutorService.schedule(
1✔
457
        new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS));
1✔
458
  }
1✔
459

460
  private final class HedgingRunnable implements Runnable {
461

462
    // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
463
    // by a positive push-back just after newSubstream is instantiated, so that we can double check.
464
    final FutureCanceller scheduledHedgingRef;
465

466
    HedgingRunnable(FutureCanceller scheduledHedging) {
1✔
467
      scheduledHedgingRef = scheduledHedging;
1✔
468
    }
1✔
469

470
    @Override
471
    public void run() {
472
      // It's safe to read state.hedgingAttemptCount here.
473
      // If this run is not cancelled, the value of state.hedgingAttemptCount won't change
474
      // until state.addActiveHedge() is called subsequently, even the state could possibly
475
      // change.
476
      Substream newSubstream = createSubstream(state.hedgingAttemptCount, false, true);
1✔
477
      if (newSubstream == null) {
1✔
478
        return;
×
479
      }
480
      callExecutor.execute(
1✔
481
          new Runnable() {
1✔
482
            @SuppressWarnings("GuardedBy")  //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
483
            @Override
484
            public void run() {
485
              boolean cancelled = false;
1✔
486
              FutureCanceller future = null;
1✔
487

488
              synchronized (lock) {
1✔
489
                if (scheduledHedgingRef.isCancelled()) {
1✔
490
                  cancelled = true;
×
491
                } else {
492
                  state = state.addActiveHedge(newSubstream);
1✔
493
                  if (hasPotentialHedging(state)
1✔
494
                      && (throttle == null || throttle.isAboveThreshold())) {
1✔
495
                    scheduledHedging = future = new FutureCanceller(lock);
1✔
496
                  } else {
497
                    state = state.freezeHedging();
1✔
498
                    scheduledHedging = null;
1✔
499
                  }
500
                }
501
              }
1✔
502

503
              if (cancelled) {
1✔
504
                // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
505
                newSubstream.stream.start(new Sublistener(newSubstream));
×
506
                newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
×
507
                return;
×
508
              }
509
              if (future != null) {
1✔
510
                future.setFuture(
1✔
511
                    scheduledExecutorService.schedule(
1✔
512
                        new HedgingRunnable(future),
513
                        hedgingPolicy.hedgingDelayNanos,
1✔
514
                        TimeUnit.NANOSECONDS));
515
              }
516
              drain(newSubstream);
1✔
517
            }
1✔
518
          });
519
    }
1✔
520
  }
521

522
  @Override
523
  public final void cancel(final Status reason) {
524
    Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
1✔
525
    noopSubstream.stream = new NoopClientStream();
1✔
526
    Runnable runnable = commit(noopSubstream);
1✔
527

528
    if (runnable != null) {
1✔
529
      synchronized (lock) {
1✔
530
        state = state.substreamDrained(noopSubstream);
1✔
531
      }
1✔
532
      runnable.run();
1✔
533
      safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
1✔
534
      return;
1✔
535
    }
536

537
    Substream winningSubstreamToCancel = null;
1✔
538
    synchronized (lock) {
1✔
539
      if (state.drainedSubstreams.contains(state.winningSubstream)) {
1✔
540
        winningSubstreamToCancel = state.winningSubstream;
1✔
541
      } else { // the winningSubstream will be cancelled while draining
542
        cancellationStatus = reason;
1✔
543
      }
544
      state = state.cancelled();
1✔
545
    }
1✔
546
    if (winningSubstreamToCancel != null) {
1✔
547
      winningSubstreamToCancel.stream.cancel(reason);
1✔
548
    }
549
  }
1✔
550

551
  private void delayOrExecute(BufferEntry bufferEntry) {
552
    Collection<Substream> savedDrainedSubstreams;
553
    synchronized (lock) {
1✔
554
      if (!state.passThrough) {
1✔
555
        state.buffer.add(bufferEntry);
1✔
556
      }
557
      savedDrainedSubstreams = state.drainedSubstreams;
1✔
558
    }
1✔
559

560
    for (Substream substream : savedDrainedSubstreams) {
1✔
561
      bufferEntry.runWith(substream);
1✔
562
    }
1✔
563
  }
1✔
564

565
  /**
566
   * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use
567
   * InputStream for buffering.
568
   */
569
  @Override
570
  public final void writeMessage(InputStream message) {
571
    throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
×
572
  }
573

574
  final void sendMessage(final ReqT message) {
575
    State savedState = state;
1✔
576
    if (savedState.passThrough) {
1✔
577
      savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
1✔
578
      return;
1✔
579
    }
580

581
    class SendMessageEntry implements BufferEntry {
1✔
582
      @Override
583
      public void runWith(Substream substream) {
584
        substream.stream.writeMessage(method.streamRequest(message));
1✔
585
        // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
586
        // flushes (or half close), but retry appears to have a code path that the flushes may
587
        // not happen. The code needs to be fixed and this removed. See #9340.
588
        substream.stream.flush();
1✔
589
      }
1✔
590
    }
591

592
    delayOrExecute(new SendMessageEntry());
1✔
593
  }
1✔
594

595
  @Override
596
  public final void request(final int numMessages) {
597
    State savedState = state;
1✔
598
    if (savedState.passThrough) {
1✔
599
      savedState.winningSubstream.stream.request(numMessages);
1✔
600
      return;
1✔
601
    }
602

603
    class RequestEntry implements BufferEntry {
1✔
604
      @Override
605
      public void runWith(Substream substream) {
606
        substream.stream.request(numMessages);
1✔
607
      }
1✔
608
    }
609

610
    delayOrExecute(new RequestEntry());
1✔
611
  }
1✔
612

613
  @Override
614
  public final void flush() {
615
    State savedState = state;
1✔
616
    if (savedState.passThrough) {
1✔
617
      savedState.winningSubstream.stream.flush();
1✔
618
      return;
1✔
619
    }
620

621
    class FlushEntry implements BufferEntry {
1✔
622
      @Override
623
      public void runWith(Substream substream) {
624
        substream.stream.flush();
1✔
625
      }
1✔
626
    }
627

628
    delayOrExecute(new FlushEntry());
1✔
629
  }
1✔
630

631
  @Override
632
  public final boolean isReady() {
633
    for (Substream substream : state.drainedSubstreams) {
1✔
634
      if (substream.stream.isReady()) {
1✔
635
        return true;
1✔
636
      }
637
    }
1✔
638
    return false;
1✔
639
  }
640

641
  @Override
642
  public void optimizeForDirectExecutor() {
643
    class OptimizeDirectEntry implements BufferEntry {
1✔
644
      @Override
645
      public void runWith(Substream substream) {
646
        substream.stream.optimizeForDirectExecutor();
1✔
647
      }
1✔
648
    }
649

650
    delayOrExecute(new OptimizeDirectEntry());
1✔
651
  }
1✔
652

653
  @Override
654
  public final void setCompressor(final Compressor compressor) {
655
    class CompressorEntry implements BufferEntry {
1✔
656
      @Override
657
      public void runWith(Substream substream) {
658
        substream.stream.setCompressor(compressor);
1✔
659
      }
1✔
660
    }
661

662
    delayOrExecute(new CompressorEntry());
1✔
663
  }
1✔
664

665
  @Override
666
  public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
667
    class FullStreamDecompressionEntry implements BufferEntry {
1✔
668
      @Override
669
      public void runWith(Substream substream) {
670
        substream.stream.setFullStreamDecompression(fullStreamDecompression);
1✔
671
      }
1✔
672
    }
673

674
    delayOrExecute(new FullStreamDecompressionEntry());
1✔
675
  }
1✔
676

677
  @Override
678
  public final void setMessageCompression(final boolean enable) {
679
    class MessageCompressionEntry implements BufferEntry {
1✔
680
      @Override
681
      public void runWith(Substream substream) {
682
        substream.stream.setMessageCompression(enable);
1✔
683
      }
1✔
684
    }
685

686
    delayOrExecute(new MessageCompressionEntry());
1✔
687
  }
1✔
688

689
  @Override
690
  public final void halfClose() {
691
    class HalfCloseEntry implements BufferEntry {
1✔
692
      @Override
693
      public void runWith(Substream substream) {
694
        substream.stream.halfClose();
1✔
695
      }
1✔
696
    }
697

698
    delayOrExecute(new HalfCloseEntry());
1✔
699
  }
1✔
700

701
  @Override
702
  public final void setAuthority(final String authority) {
703
    class AuthorityEntry implements BufferEntry {
1✔
704
      @Override
705
      public void runWith(Substream substream) {
706
        substream.stream.setAuthority(authority);
1✔
707
      }
1✔
708
    }
709

710
    delayOrExecute(new AuthorityEntry());
1✔
711
  }
1✔
712

713
  @Override
714
  public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
715
    class DecompressorRegistryEntry implements BufferEntry {
1✔
716
      @Override
717
      public void runWith(Substream substream) {
718
        substream.stream.setDecompressorRegistry(decompressorRegistry);
1✔
719
      }
1✔
720
    }
721

722
    delayOrExecute(new DecompressorRegistryEntry());
1✔
723
  }
1✔
724

725
  @Override
726
  public final void setMaxInboundMessageSize(final int maxSize) {
727
    class MaxInboundMessageSizeEntry implements BufferEntry {
1✔
728
      @Override
729
      public void runWith(Substream substream) {
730
        substream.stream.setMaxInboundMessageSize(maxSize);
1✔
731
      }
1✔
732
    }
733

734
    delayOrExecute(new MaxInboundMessageSizeEntry());
1✔
735
  }
1✔
736

737
  @Override
738
  public final void setMaxOutboundMessageSize(final int maxSize) {
739
    class MaxOutboundMessageSizeEntry implements BufferEntry {
1✔
740
      @Override
741
      public void runWith(Substream substream) {
742
        substream.stream.setMaxOutboundMessageSize(maxSize);
1✔
743
      }
1✔
744
    }
745

746
    delayOrExecute(new MaxOutboundMessageSizeEntry());
1✔
747
  }
1✔
748

749
  @Override
750
  public final void setDeadline(final Deadline deadline) {
751
    class DeadlineEntry implements BufferEntry {
1✔
752
      @Override
753
      public void runWith(Substream substream) {
754
        substream.stream.setDeadline(deadline);
1✔
755
      }
1✔
756
    }
757

758
    delayOrExecute(new DeadlineEntry());
1✔
759
  }
1✔
760

761
  @Override
762
  public final Attributes getAttributes() {
763
    if (state.winningSubstream != null) {
1✔
764
      return state.winningSubstream.stream.getAttributes();
1✔
765
    }
766
    return Attributes.EMPTY;
×
767
  }
768

769
  @Override
770
  public void appendTimeoutInsight(InsightBuilder insight) {
771
    State currentState;
772
    synchronized (lock) {
1✔
773
      insight.appendKeyValue("closed", closedSubstreamsInsight);
1✔
774
      currentState = state;
1✔
775
    }
1✔
776
    if (currentState.winningSubstream != null) {
1✔
777
      // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
778
      // of the winning substream, they may not have received closed() notifications yet, thus they
779
      // may be missing from closedSubstreamsInsight.  This may be a little confusing to the user.
780
      // We need to figure out how to include them.
781
      InsightBuilder substreamInsight = new InsightBuilder();
1✔
782
      currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
1✔
783
      insight.appendKeyValue("committed", substreamInsight);
1✔
784
    } else {
1✔
785
      InsightBuilder openSubstreamsInsight = new InsightBuilder();
1✔
786
      // drainedSubstreams doesn't include all open substreams.  Those which have just been created
787
      // and are still catching up with buffered requests (in other words, still draining) will not
788
      // show up.  We think this is benign, because the draining should be typically fast, and it'd
789
      // be indistinguishable from the case where those streams are to be created a little late due
790
      // to delays in the timer.
791
      for (Substream sub : currentState.drainedSubstreams) {
1✔
792
        InsightBuilder substreamInsight = new InsightBuilder();
1✔
793
        sub.stream.appendTimeoutInsight(substreamInsight);
1✔
794
        openSubstreamsInsight.append(substreamInsight);
1✔
795
      }
1✔
796
      insight.appendKeyValue("open", openSubstreamsInsight);
1✔
797
    }
798
  }
1✔
799

800
  private static Random random = new Random();
1✔
801

802
  @VisibleForTesting
803
  static void setRandom(Random random) {
804
    RetriableStream.random = random;
1✔
805
  }
1✔
806

807
  /**
808
   * Whether there is any potential hedge at the moment. A false return value implies there is
809
   * absolutely no potential hedge. At least one of the hedges will observe a false return value
810
   * when calling this method, unless otherwise the rpc is committed.
811
   */
812
  // only called when isHedging is true
813
  @GuardedBy("lock")
814
  private boolean hasPotentialHedging(State state) {
815
    return state.winningSubstream == null
1✔
816
        && state.hedgingAttemptCount < hedgingPolicy.maxAttempts
817
        && !state.hedgingFrozen;
818
  }
819

820
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
821
  private void freezeHedging() {
822
    Future<?> futureToBeCancelled = null;
1✔
823
    synchronized (lock) {
1✔
824
      if (scheduledHedging != null) {
1✔
825
        futureToBeCancelled = scheduledHedging.markCancelled();
1✔
826
        scheduledHedging = null;
1✔
827
      }
828
      state = state.freezeHedging();
1✔
829
    }
1✔
830

831
    if (futureToBeCancelled != null) {
1✔
832
      futureToBeCancelled.cancel(false);
1✔
833
    }
834
  }
1✔
835

836
  private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
837
    savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
1✔
838
        metadata);
839
    if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
1✔
840
      listenerSerializeExecutor.execute(
1✔
841
          new Runnable() {
1✔
842
            @Override
843
            public void run() {
844
              isClosed = true;
1✔
845
              masterListener.closed(status, progress, metadata);
1✔
846
            }
1✔
847
          });
848
    }
849
  }
1✔
850

851
  private static final boolean isExperimentalRetryJitterEnabled = GrpcUtil
1✔
852
          .getFlag("GRPC_EXPERIMENTAL_XDS_RLS_LB", true);
1✔
853

854
  public static long intervalWithJitter(long intervalNanos) {
855
    double inverseJitterFactor = isExperimentalRetryJitterEnabled
1✔
856
            ? 0.8 * random.nextDouble() + 0.4 : random.nextDouble();
1✔
857
    return (long) (intervalNanos * inverseJitterFactor);
1✔
858
  }
859

860
  private static final class SavedCloseMasterListenerReason {
861
    private final Status status;
862
    private final RpcProgress progress;
863
    private final Metadata metadata;
864

865
    SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
1✔
866
      this.status = status;
1✔
867
      this.progress = progress;
1✔
868
      this.metadata = metadata;
1✔
869
    }
1✔
870
  }
871

872
  private interface BufferEntry {
873
    /** Replays the buffer entry with the given stream. */
874
    void runWith(Substream substream);
875
  }
876

877
  private final class Sublistener implements ClientStreamListener {
1✔
878
    final Substream substream;
879

880
    Sublistener(Substream substream) {
1✔
881
      this.substream = substream;
1✔
882
    }
1✔
883

884
    @Override
885
    public void headersRead(final Metadata headers) {
886
      if (substream.previousAttemptCount > 0) {
1✔
887
        headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
1✔
888
        headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
1✔
889
      }
890
      commitAndRun(substream);
1✔
891
      if (state.winningSubstream == substream) {
1✔
892
        if (throttle != null) {
1✔
893
          throttle.onSuccess();
1✔
894
        }
895
        listenerSerializeExecutor.execute(
1✔
896
            new Runnable() {
1✔
897
              @Override
898
              public void run() {
899
                masterListener.headersRead(headers);
1✔
900
              }
1✔
901
            });
902
      }
903
    }
1✔
904

905
    @Override
906
    public void closed(
907
        final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
908
      synchronized (lock) {
1✔
909
        state = state.substreamClosed(substream);
1✔
910
        closedSubstreamsInsight.append(status.getCode());
1✔
911
      }
1✔
912

913
      if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
914
        assert savedCloseMasterListenerReason != null;
1✔
915
        listenerSerializeExecutor.execute(
1✔
916
            new Runnable() {
1✔
917
              @Override
918
              public void run() {
919
                isClosed = true;
1✔
920
                masterListener.closed(savedCloseMasterListenerReason.status,
1✔
921
                    savedCloseMasterListenerReason.progress,
1✔
922
                    savedCloseMasterListenerReason.metadata);
1✔
923
              }
1✔
924
            });
925
        return;
1✔
926
      }
927

928
      // handle a race between buffer limit exceeded and closed, when setting
929
      // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
930
      if (substream.bufferLimitExceeded) {
1✔
931
        commitAndRun(substream);
1✔
932
        if (state.winningSubstream == substream) {
1✔
933
          safeCloseMasterListener(status, rpcProgress, trailers);
1✔
934
        }
935
        return;
1✔
936
      }
937
      if (rpcProgress == RpcProgress.MISCARRIED
1✔
938
          && localOnlyTransparentRetries.incrementAndGet() > 1_000) {
1✔
939
        commitAndRun(substream);
1✔
940
        if (state.winningSubstream == substream) {
1✔
941
          Status tooManyTransparentRetries = GrpcUtil.statusWithDetails(
1✔
942
              Status.Code.INTERNAL, "Too many transparent retries. Might be a bug in gRPC", status);
943
          safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
1✔
944
        }
945
        return;
1✔
946
      }
947

948
      if (state.winningSubstream == null) {
1✔
949
        if (rpcProgress == RpcProgress.MISCARRIED
1✔
950
            || (rpcProgress == RpcProgress.REFUSED
951
                && noMoreTransparentRetry.compareAndSet(false, true))) {
1✔
952
          // transparent retry
953
          final Substream newSubstream = createSubstream(substream.previousAttemptCount,
1✔
954
              true, false);
955
          if (newSubstream == null) {
1✔
956
            return;
×
957
          }
958
          if (isHedging) {
1✔
959
            synchronized (lock) {
1✔
960
              // Although this operation is not done atomically with
961
              // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of
962
              // activeHedges, so neither does it affect the commitment decision of other threads,
963
              // nor do the commitment decision making threads affect itself.
964
              state = state.replaceActiveHedge(substream, newSubstream);
1✔
965
            }
1✔
966
          }
967

968
          callExecutor.execute(new Runnable() {
1✔
969
            @Override
970
            public void run() {
971
              drain(newSubstream);
1✔
972
            }
1✔
973
          });
974
          return;
1✔
975
        } else if (rpcProgress == RpcProgress.DROPPED) {
1✔
976
          // For normal retry, nothing need be done here, will just commit.
977
          // For hedging, cancel scheduled hedge that is scheduled prior to the drop
978
          if (isHedging) {
1✔
979
            freezeHedging();
×
980
          }
981
        } else {
982
          noMoreTransparentRetry.set(true);
1✔
983

984
          if (isHedging) {
1✔
985
            HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
1✔
986
            if (hedgingPlan.isHedgeable) {
1✔
987
              pushbackHedging(hedgingPlan.hedgingPushbackMillis);
1✔
988
            }
989
            synchronized (lock) {
1✔
990
              state = state.removeActiveHedge(substream);
1✔
991
              // The invariant is whether or not #(Potential Hedge + active hedges) > 0.
992
              // Once hasPotentialHedging(state) is false, it will always be false, and then
993
              // #(state.activeHedges) will be decreasing. This guarantees that even there may be
994
              // multiple concurrent hedges, one of the hedges will end up committed.
995
              if (hedgingPlan.isHedgeable) {
1✔
996
                if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
1✔
997
                  return;
1✔
998
                }
999
                // else, no activeHedges, no new hedges possible, try to commit
1000
              } // else, isHedgeable is false, try to commit
1001
            }
1✔
1002
          } else {
1✔
1003
            RetryPlan retryPlan = makeRetryDecision(status, trailers);
1✔
1004
            if (retryPlan.shouldRetry) {
1✔
1005
              // retry
1006
              Substream newSubstream = createSubstream(substream.previousAttemptCount + 1,
1✔
1007
                  false, false);
1008
              if (newSubstream == null) {
1✔
1009
                return;
×
1010
              }
1011
              // The check state.winningSubstream == null, checking if is not already committed, is
1012
              // racy, but is still safe b/c the retry will also handle committed/cancellation
1013
              FutureCanceller scheduledRetryCopy;
1014
              synchronized (lock) {
1✔
1015
                scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
1✔
1016
              }
1✔
1017

1018
              class RetryBackoffRunnable implements Runnable {
1✔
1019
                @Override
1020
                @SuppressWarnings("FutureReturnValueIgnored")
1021
                public void run() {
1022
                  synchronized (scheduledRetryCopy.lock) {
1✔
1023
                    if (scheduledRetryCopy.isCancelled()) {
1✔
1024
                      return;
×
1025
                    } else {
1026
                      scheduledRetryCopy.markCancelled();
1✔
1027
                    }
1028
                  }
1✔
1029

1030
                  callExecutor.execute(
1✔
1031
                      new Runnable() {
1✔
1032
                        @Override
1033
                        public void run() {
1034
                          drain(newSubstream);
1✔
1035
                        }
1✔
1036
                      });
1037
                }
1✔
1038
              }
1039

1040
              scheduledRetryCopy.setFuture(
1✔
1041
                  scheduledExecutorService.schedule(
1✔
1042
                      new RetryBackoffRunnable(),
1043
                      retryPlan.backoffNanos,
1044
                      TimeUnit.NANOSECONDS));
1045
              return;
1✔
1046
            }
1047
          }
1048
        }
1049
      }
1050

1051
      commitAndRun(substream);
1✔
1052
      if (state.winningSubstream == substream) {
1✔
1053
        safeCloseMasterListener(status, rpcProgress, trailers);
1✔
1054
      }
1055
    }
1✔
1056

1057
    /**
1058
     * Decides in current situation whether or not the RPC should retry and if it should retry how
1059
     * long the backoff should be. The decision does not take the commitment status into account, so
1060
     * caller should check it separately. It also updates the throttle. It does not change state.
1061
     */
1062
    private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
1063
      if (retryPolicy == null) {
1✔
1064
        return new RetryPlan(false, 0);
1✔
1065
      }
1066
      boolean shouldRetry = false;
1✔
1067
      long backoffNanos = 0L;
1✔
1068
      boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
1✔
1069
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1070
      boolean isThrottled = false;
1✔
1071
      if (throttle != null) {
1✔
1072
        if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1073
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1074
        }
1075
      }
1076

1077
      if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
1✔
1078
        if (pushbackMillis == null) {
1✔
1079
          if (isRetryableStatusCode) {
1✔
1080
            shouldRetry = true;
1✔
1081
            backoffNanos = intervalWithJitter(nextBackoffIntervalNanos);
1✔
1082
            nextBackoffIntervalNanos = Math.min(
1✔
1083
                (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
1✔
1084
                retryPolicy.maxBackoffNanos);
1✔
1085
          } // else no retry
1086
        } else if (pushbackMillis >= 0) {
1✔
1087
          shouldRetry = true;
1✔
1088
          backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
1✔
1089
          nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
1090
        } // else no retry
1091
      } // else no retry
1092

1093
      return new RetryPlan(shouldRetry, backoffNanos);
1✔
1094
    }
1095

1096
    private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
1097
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1098
      boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
1✔
1099
      boolean isThrottled = false;
1✔
1100
      if (throttle != null) {
1✔
1101
        if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1102
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1103
        }
1104
      }
1105
      if (!isFatal && !isThrottled && !status.isOk()
1✔
1106
          && (pushbackMillis != null && pushbackMillis > 0)) {
1✔
1107
        pushbackMillis = 0; // We want the retry after a nonfatal error to be immediate
1✔
1108
      }
1109
      return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
1✔
1110
    }
1111

1112
    @Nullable
1113
    private Integer getPushbackMills(Metadata trailer) {
1114
      String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
1✔
1115
      Integer pushbackMillis = null;
1✔
1116
      if (pushbackStr != null) {
1✔
1117
        try {
1118
          pushbackMillis = Integer.valueOf(pushbackStr);
1✔
1119
        } catch (NumberFormatException e) {
1✔
1120
          pushbackMillis = -1;
1✔
1121
        }
1✔
1122
      }
1123
      return pushbackMillis;
1✔
1124
    }
1125

1126
    @Override
1127
    public void messagesAvailable(final MessageProducer producer) {
1128
      State savedState = state;
1✔
1129
      checkState(
1✔
1130
          savedState.winningSubstream != null, "Headers should be received prior to messages.");
1131
      if (savedState.winningSubstream != substream) {
1✔
1132
        GrpcUtil.closeQuietly(producer);
1✔
1133
        return;
1✔
1134
      }
1135
      listenerSerializeExecutor.execute(
1✔
1136
          new Runnable() {
1✔
1137
            @Override
1138
            public void run() {
1139
              masterListener.messagesAvailable(producer);
1✔
1140
            }
1✔
1141
          });
1142
    }
1✔
1143

1144
    @Override
1145
    public void onReady() {
1146
      // FIXME(#7089): hedging case is broken.
1147
      if (!isReady()) {
1✔
1148
        return;
1✔
1149
      }
1150
      listenerSerializeExecutor.execute(
1✔
1151
          new Runnable() {
1✔
1152
            @Override
1153
            public void run() {
1154
              if (!isClosed) {
1✔
1155
                masterListener.onReady();
1✔
1156
              }
1157
            }
1✔
1158
          });
1159
    }
1✔
1160
  }
1161

1162
  private static final class State {
1163
    /** Committed and the winning substream drained. */
1164
    final boolean passThrough;
1165

1166
    /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
1167
    @Nullable final List<BufferEntry> buffer;
1168

1169
    /**
1170
     * Unmodifiable collection of all the open substreams that are drained. Singleton once
1171
     * passThrough; Empty if committed but not passTrough.
1172
     */
1173
    final Collection<Substream> drainedSubstreams;
1174

1175
    /**
1176
     * Unmodifiable collection of all the active hedging substreams.
1177
     *
1178
     * <p>A substream even with the attribute substream.closed being true may be considered still
1179
     * "active" at the moment as long as it is in this collection.
1180
     */
1181
    final Collection<Substream> activeHedges; // not null once isHedging = true
1182

1183
    final int hedgingAttemptCount;
1184

1185
    /** Null until committed. */
1186
    @Nullable final Substream winningSubstream;
1187

1188
    /** Not required to set to true when cancelled, but can short-circuit the draining process. */
1189
    final boolean cancelled;
1190

1191
    /** No more hedging due to events like drop or pushback. */
1192
    final boolean hedgingFrozen;
1193

1194
    State(
1195
        @Nullable List<BufferEntry> buffer,
1196
        Collection<Substream> drainedSubstreams,
1197
        Collection<Substream> activeHedges,
1198
        @Nullable Substream winningSubstream,
1199
        boolean cancelled,
1200
        boolean passThrough,
1201
        boolean hedgingFrozen,
1202
        int hedgingAttemptCount) {
1✔
1203
      this.buffer = buffer;
1✔
1204
      this.drainedSubstreams =
1✔
1205
          checkNotNull(drainedSubstreams, "drainedSubstreams");
1✔
1206
      this.winningSubstream = winningSubstream;
1✔
1207
      this.activeHedges = activeHedges;
1✔
1208
      this.cancelled = cancelled;
1✔
1209
      this.passThrough = passThrough;
1✔
1210
      this.hedgingFrozen = hedgingFrozen;
1✔
1211
      this.hedgingAttemptCount = hedgingAttemptCount;
1✔
1212

1213
      checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
1✔
1214
      checkState(
1✔
1215
          !passThrough || winningSubstream != null,
1216
          "passThrough should imply winningSubstream != null");
1217
      checkState(
1✔
1218
          !passThrough
1219
              || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
1✔
1220
              || (drainedSubstreams.size() == 0 && winningSubstream.closed),
1✔
1221
          "passThrough should imply winningSubstream is drained");
1222
      checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
1✔
1223
    }
1✔
1224

1225
    @CheckReturnValue
1226
    // GuardedBy RetriableStream.lock
1227
    State cancelled() {
1228
      return new State(
1✔
1229
          buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough,
1230
          hedgingFrozen, hedgingAttemptCount);
1231
    }
1232

1233
    /** The given substream is drained. */
1234
    @CheckReturnValue
1235
    // GuardedBy RetriableStream.lock
1236
    State substreamDrained(Substream substream) {
1237
      checkState(!passThrough, "Already passThrough");
1✔
1238

1239
      Collection<Substream> drainedSubstreams;
1240
      
1241
      if (substream.closed) {
1✔
1242
        drainedSubstreams = this.drainedSubstreams;
1✔
1243
      } else if (this.drainedSubstreams.isEmpty()) {
1✔
1244
        // optimize for 0-retry, which is most of the cases.
1245
        drainedSubstreams = Collections.singletonList(substream);
1✔
1246
      } else {
1247
        drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1248
        drainedSubstreams.add(substream);
1✔
1249
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1250
      }
1251

1252
      boolean passThrough = winningSubstream != null;
1✔
1253

1254
      List<BufferEntry> buffer = this.buffer;
1✔
1255
      if (passThrough) {
1✔
1256
        checkState(
1✔
1257
            winningSubstream == substream, "Another RPC attempt has already committed");
1258
        buffer = null;
1✔
1259
      }
1260

1261
      return new State(
1✔
1262
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1263
          hedgingFrozen, hedgingAttemptCount);
1264
    }
1265

1266
    /** The given substream is closed. */
1267
    @CheckReturnValue
1268
    // GuardedBy RetriableStream.lock
1269
    State substreamClosed(Substream substream) {
1270
      substream.closed = true;
1✔
1271
      if (this.drainedSubstreams.contains(substream)) {
1✔
1272
        Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1273
        drainedSubstreams.remove(substream);
1✔
1274
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1275
        return new State(
1✔
1276
            buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1277
            hedgingFrozen, hedgingAttemptCount);
1278
      } else {
1279
        return this;
1✔
1280
      }
1281
    }
1282

1283
    @CheckReturnValue
1284
    // GuardedBy RetriableStream.lock
1285
    State committed(Substream winningSubstream) {
1286
      checkState(this.winningSubstream == null, "Already committed");
1✔
1287

1288
      boolean passThrough = false;
1✔
1289
      List<BufferEntry> buffer = this.buffer;
1✔
1290
      Collection<Substream> drainedSubstreams;
1291

1292
      if (this.drainedSubstreams.contains(winningSubstream)) {
1✔
1293
        passThrough = true;
1✔
1294
        buffer = null;
1✔
1295
        drainedSubstreams = Collections.singleton(winningSubstream);
1✔
1296
      } else {
1297
        drainedSubstreams = Collections.emptyList();
1✔
1298
      }
1299

1300
      return new State(
1✔
1301
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1302
          hedgingFrozen, hedgingAttemptCount);
1303
    }
1304

1305
    @CheckReturnValue
1306
    // GuardedBy RetriableStream.lock
1307
    State freezeHedging() {
1308
      if (hedgingFrozen) {
1✔
1309
        return this;
×
1310
      }
1311
      return new State(
1✔
1312
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1313
          true, hedgingAttemptCount);
1314
    }
1315

1316
    @CheckReturnValue
1317
    // GuardedBy RetriableStream.lock
1318
    // state.hedgingAttemptCount is modified only here.
1319
    // The method is only called in RetriableStream.start() and HedgingRunnable.run()
1320
    State addActiveHedge(Substream substream) {
1321
      // hasPotentialHedging must be true
1322
      checkState(!hedgingFrozen, "hedging frozen");
1✔
1323
      checkState(winningSubstream == null, "already committed");
1✔
1324

1325
      Collection<Substream> activeHedges;
1326
      if (this.activeHedges == null) {
1✔
1327
        activeHedges = Collections.singleton(substream);
1✔
1328
      } else {
1329
        activeHedges = new ArrayList<>(this.activeHedges);
1✔
1330
        activeHedges.add(substream);
1✔
1331
        activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1332
      }
1333

1334
      int hedgingAttemptCount = this.hedgingAttemptCount + 1;
1✔
1335
      return new State(
1✔
1336
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1337
          hedgingFrozen, hedgingAttemptCount);
1338
    }
1339

1340
    @CheckReturnValue
1341
    // GuardedBy RetriableStream.lock
1342
    // The method is only called in Sublistener.closed()
1343
    State removeActiveHedge(Substream substream) {
1344
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1345
      activeHedges.remove(substream);
1✔
1346
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1347

1348
      return new State(
1✔
1349
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1350
          hedgingFrozen, hedgingAttemptCount);
1351
    }
1352

1353
    @CheckReturnValue
1354
    // GuardedBy RetriableStream.lock
1355
    // The method is only called for transparent retry.
1356
    State replaceActiveHedge(Substream oldOne, Substream newOne) {
1357
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1358
      activeHedges.remove(oldOne);
1✔
1359
      activeHedges.add(newOne);
1✔
1360
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1361

1362
      return new State(
1✔
1363
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1364
          hedgingFrozen, hedgingAttemptCount);
1365
    }
1366
  }
1367

1368
  /**
1369
   * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
1370
   *  attributes.
1371
   */
1372
  private static final class Substream {
1373
    ClientStream stream;
1374

1375
    // GuardedBy RetriableStream.lock
1376
    boolean closed;
1377

1378
    // setting to true must be GuardedBy RetriableStream.lock
1379
    boolean bufferLimitExceeded;
1380

1381
    final int previousAttemptCount;
1382

1383
    Substream(int previousAttemptCount) {
1✔
1384
      this.previousAttemptCount = previousAttemptCount;
1✔
1385
    }
1✔
1386
  }
1387

1388

1389
  /**
1390
   * Traces the buffer used by a substream.
1391
   */
1392
  class BufferSizeTracer extends ClientStreamTracer {
1393
    // Each buffer size tracer is dedicated to one specific substream.
1394
    private final Substream substream;
1395

1396
    @GuardedBy("lock")
1397
    long bufferNeeded;
1398

1399
    BufferSizeTracer(Substream substream) {
1✔
1400
      this.substream = substream;
1✔
1401
    }
1✔
1402

1403
    /**
1404
     * A message is sent to the wire, so its reference would be released if no retry or
1405
     * hedging were involved. So at this point we have to hold the reference of the message longer
1406
     * for retry, and we need to increment {@code substream.bufferNeeded}.
1407
     */
1408
    @Override
1409
    public void outboundWireSize(long bytes) {
1410
      if (state.winningSubstream != null) {
1✔
1411
        return;
1✔
1412
      }
1413

1414
      Runnable postCommitTask = null;
1✔
1415

1416
      // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
1417
      synchronized (lock) {
1✔
1418
        if (state.winningSubstream != null || substream.closed) {
1✔
1419
          return;
×
1420
        }
1421
        bufferNeeded += bytes;
1✔
1422
        if (bufferNeeded <= perRpcBufferUsed) {
1✔
1423
          return;
1✔
1424
        }
1425

1426
        if (bufferNeeded > perRpcBufferLimit) {
1✔
1427
          substream.bufferLimitExceeded = true;
1✔
1428
        } else {
1429
          // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
1430
          long savedChannelBufferUsed =
1✔
1431
              channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
1✔
1432
          perRpcBufferUsed = bufferNeeded;
1✔
1433

1434
          if (savedChannelBufferUsed > channelBufferLimit) {
1✔
1435
            substream.bufferLimitExceeded = true;
1✔
1436
          }
1437
        }
1438

1439
        if (substream.bufferLimitExceeded) {
1✔
1440
          postCommitTask = commit(substream);
1✔
1441
        }
1442
      }
1✔
1443

1444
      if (postCommitTask != null) {
1✔
1445
        postCommitTask.run();
1✔
1446
      }
1447
    }
1✔
1448
  }
1449

1450
  /**
1451
   *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
1452
   *  the Channel. There should be a single instance of it for each channel.
1453
   */
1454
  static final class ChannelBufferMeter {
1✔
1455
    private final AtomicLong bufferUsed = new AtomicLong();
1✔
1456

1457
    @VisibleForTesting
1458
    long addAndGet(long newBytesUsed) {
1459
      return bufferUsed.addAndGet(newBytesUsed);
1✔
1460
    }
1461
  }
1462

1463
  /**
1464
   * Used for retry throttling.
1465
   */
1466
  static final class Throttle {
1467

1468
    private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
1469

1470
    /**
1471
     * 1000 times the maxTokens field of the retryThrottling policy in service config.
1472
     * The number of tokens starts at maxTokens. The token_count will always be between 0 and
1473
     * maxTokens.
1474
     */
1475
    final int maxTokens;
1476

1477
    /**
1478
     * Half of {@code maxTokens}.
1479
     */
1480
    final int threshold;
1481

1482
    /**
1483
     * 1000 times the tokenRatio field of the retryThrottling policy in service config.
1484
     */
1485
    final int tokenRatio;
1486

1487
    final AtomicInteger tokenCount = new AtomicInteger();
1✔
1488

1489
    Throttle(float maxTokens, float tokenRatio) {
1✔
1490
      // tokenRatio is up to 3 decimal places
1491
      this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1492
      this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1493
      this.threshold = this.maxTokens / 2;
1✔
1494
      tokenCount.set(this.maxTokens);
1✔
1495
    }
1✔
1496

1497
    @VisibleForTesting
1498
    boolean isAboveThreshold() {
1499
      return tokenCount.get() > threshold;
1✔
1500
    }
1501

1502
    /**
1503
     * Counts down the token on qualified failure and checks if it is above the threshold
1504
     * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
1505
     * a not-to-retry pushback.
1506
     */
1507
    @VisibleForTesting
1508
    boolean onQualifiedFailureThenCheckIsAboveThreshold() {
1509
      while (true) {
1510
        int currentCount = tokenCount.get();
1✔
1511
        if (currentCount == 0) {
1✔
1512
          return false;
1✔
1513
        }
1514
        int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1515
        boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
1✔
1516
        if (updated) {
1✔
1517
          return decremented > threshold;
1✔
1518
        }
1519
      }
×
1520
    }
1521

1522
    @VisibleForTesting
1523
    void onSuccess() {
1524
      while (true) {
1525
        int currentCount = tokenCount.get();
1✔
1526
        if (currentCount == maxTokens) {
1✔
1527
          break;
1✔
1528
        }
1529
        int incremented = currentCount + tokenRatio;
1✔
1530
        boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
1✔
1531
        if (updated) {
1✔
1532
          break;
1✔
1533
        }
1534
      }
×
1535
    }
1✔
1536

1537
    @Override
1538
    public boolean equals(Object o) {
1539
      if (this == o) {
1✔
1540
        return true;
×
1541
      }
1542
      if (!(o instanceof Throttle)) {
1✔
1543
        return false;
×
1544
      }
1545
      Throttle that = (Throttle) o;
1✔
1546
      return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
1✔
1547
    }
1548

1549
    @Override
1550
    public int hashCode() {
1551
      return Objects.hashCode(maxTokens, tokenRatio);
×
1552
    }
1553
  }
1554

1555
  private static final class RetryPlan {
1556
    final boolean shouldRetry;
1557
    final long backoffNanos;
1558

1559
    RetryPlan(boolean shouldRetry, long backoffNanos) {
1✔
1560
      this.shouldRetry = shouldRetry;
1✔
1561
      this.backoffNanos = backoffNanos;
1✔
1562
    }
1✔
1563
  }
1564

1565
  private static final class HedgingPlan {
1566
    final boolean isHedgeable;
1567
    @Nullable
1568
    final Integer hedgingPushbackMillis;
1569

1570
    public HedgingPlan(
1571
        boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
1✔
1572
      this.isHedgeable = isHedgeable;
1✔
1573
      this.hedgingPushbackMillis = hedgingPushbackMillis;
1✔
1574
    }
1✔
1575
  }
1576

1577
  /** Allows cancelling a Future without racing with setting the future. */
1578
  private static final class FutureCanceller {
1579

1580
    final Object lock;
1581
    @GuardedBy("lock")
1582
    Future<?> future;
1583
    @GuardedBy("lock")
1584
    boolean cancelled;
1585

1586
    FutureCanceller(Object lock) {
1✔
1587
      this.lock = lock;
1✔
1588
    }
1✔
1589

1590
    void setFuture(Future<?> future) {
1591
      boolean wasCancelled;
1592
      synchronized (lock) {
1✔
1593
        wasCancelled = cancelled;
1✔
1594
        if (!wasCancelled) {
1✔
1595
          this.future = future;
1✔
1596
        }
1597
      }
1✔
1598
      if (wasCancelled) {
1✔
1599
        future.cancel(false);
×
1600
      }
1601
    }
1✔
1602

1603
    @GuardedBy("lock")
1604
    @CheckForNull // Must cancel the returned future if not null.
1605
    Future<?> markCancelled() {
1606
      cancelled = true;
1✔
1607
      return future;
1✔
1608
    }
1609

1610
    @GuardedBy("lock")
1611
    boolean isCancelled() {
1612
      return cancelled;
1✔
1613
    }
1614
  }
1615
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc