• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19998

25 Sep 2025 07:59AM UTC coverage: 88.59% (+0.04%) from 88.554%
#19998

push

github

web-flow
xds: Handle wildcards in split patterns in DNS SAN exact matching (#12345)

Makes DNS SAN exact matching in xds handle wildcards in split patterns similar to Envoy.

Fixes #12326

34807 of 39290 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.07
/../core/src/main/java/io/grpc/internal/RetriableStream.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Objects;
25
import com.google.errorprone.annotations.CheckReturnValue;
26
import com.google.errorprone.annotations.concurrent.GuardedBy;
27
import io.grpc.Attributes;
28
import io.grpc.ClientStreamTracer;
29
import io.grpc.Compressor;
30
import io.grpc.Deadline;
31
import io.grpc.DecompressorRegistry;
32
import io.grpc.Metadata;
33
import io.grpc.MethodDescriptor;
34
import io.grpc.Status;
35
import io.grpc.SynchronizationContext;
36
import io.grpc.internal.ClientStreamListener.RpcProgress;
37
import java.io.InputStream;
38
import java.lang.Thread.UncaughtExceptionHandler;
39
import java.util.ArrayList;
40
import java.util.Collection;
41
import java.util.Collections;
42
import java.util.List;
43
import java.util.Random;
44
import java.util.concurrent.Executor;
45
import java.util.concurrent.Future;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import java.util.concurrent.atomic.AtomicBoolean;
49
import java.util.concurrent.atomic.AtomicInteger;
50
import java.util.concurrent.atomic.AtomicLong;
51
import javax.annotation.CheckForNull;
52
import javax.annotation.Nullable;
53

54
/** A logical {@link ClientStream} that is retriable. */
55
abstract class RetriableStream<ReqT> implements ClientStream {
56
  @VisibleForTesting
57
  static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
1✔
58
      Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
1✔
59

60
  @VisibleForTesting
61
  static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
1✔
62
      Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
1✔
63

64
  private static final Status CANCELLED_BECAUSE_COMMITTED =
1✔
65
      Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
1✔
66

67
  private final MethodDescriptor<ReqT, ?> method;
68
  private final Executor callExecutor;
69
  private final Executor listenerSerializeExecutor = new SynchronizationContext(
1✔
70
      new UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          throw Status.fromThrowable(e)
×
74
              .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
×
75
              .asRuntimeException();
×
76
        }
77
      }
78
  );
79
  private final ScheduledExecutorService scheduledExecutorService;
80
  // Must not modify it.
81
  private final Metadata headers;
82
  @Nullable
83
  private final RetryPolicy retryPolicy;
84
  @Nullable
85
  private final HedgingPolicy hedgingPolicy;
86
  private final boolean isHedging;
87

88
  /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
89
  private final Object lock = new Object();
1✔
90

91
  private final ChannelBufferMeter channelBufferUsed;
92
  private final long perRpcBufferLimit;
93
  private final long channelBufferLimit;
94
  @Nullable
95
  private final Throttle throttle;
96
  @GuardedBy("lock")
1✔
97
  private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
98

99
  private volatile State state = new State(
1✔
100
      new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
1✔
101
      false, 0);
102

103
  /**
104
   * Either non-local transparent retry happened or reached server's application logic.
105
   *
106
   * <p>Note that local-only transparent retries are unlimited.
107
   */
108
  private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
1✔
109
  private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
1✔
110
  private final AtomicInteger inFlightSubStreams = new AtomicInteger();
1✔
111
  private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112

113
  // Used for recording the share of buffer used for the current call out of the channel buffer.
114
  // This field would not be necessary if there is no channel buffer limit.
115
  @GuardedBy("lock")
116
  private long perRpcBufferUsed;
117

118
  private ClientStreamListener masterListener;
119
  @GuardedBy("lock")
120
  private FutureCanceller scheduledRetry;
121
  @GuardedBy("lock")
122
  private FutureCanceller scheduledHedging;
123
  private long nextBackoffIntervalNanos;
124
  private Status cancellationStatus;
125
  private boolean isClosed;
126

127
  RetriableStream(
128
      MethodDescriptor<ReqT, ?> method, Metadata headers,
129
      ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
130
      Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
131
      @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy,
132
      @Nullable Throttle throttle) {
1✔
133
    this.method = method;
1✔
134
    this.channelBufferUsed = channelBufferUsed;
1✔
135
    this.perRpcBufferLimit = perRpcBufferLimit;
1✔
136
    this.channelBufferLimit = channelBufferLimit;
1✔
137
    this.callExecutor = callExecutor;
1✔
138
    this.scheduledExecutorService = scheduledExecutorService;
1✔
139
    this.headers = headers;
1✔
140
    this.retryPolicy = retryPolicy;
1✔
141
    if (retryPolicy != null) {
1✔
142
      this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
143
    }
144
    this.hedgingPolicy = hedgingPolicy;
1✔
145
    checkArgument(
1✔
146
        retryPolicy == null || hedgingPolicy == null,
147
        "Should not provide both retryPolicy and hedgingPolicy");
148
    this.isHedging = hedgingPolicy != null;
1✔
149
    this.throttle = throttle;
1✔
150
  }
1✔
151

152
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153
  @Nullable // null if already committed
154
  @CheckReturnValue
155
  private Runnable commit(final Substream winningSubstream) {
156
    synchronized (lock) {
1✔
157
      if (state.winningSubstream != null) {
1✔
158
        return null;
1✔
159
      }
160
      final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
1✔
161

162
      state = state.committed(winningSubstream);
1✔
163

164
      // subtract the share of this RPC from channelBufferUsed.
165
      channelBufferUsed.addAndGet(-perRpcBufferUsed);
1✔
166

167
      final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
1✔
168
      final Future<?> retryFuture;
169
      if (scheduledRetry != null) {
1✔
170
        retryFuture = scheduledRetry.markCancelled();
1✔
171
        scheduledRetry = null;
1✔
172
      } else {
173
        retryFuture = null;
1✔
174
      }
175
      // cancel the scheduled hedging if it is scheduled prior to the commitment
176
      final Future<?> hedgingFuture;
177
      if (scheduledHedging != null) {
1✔
178
        hedgingFuture = scheduledHedging.markCancelled();
1✔
179
        scheduledHedging = null;
1✔
180
      } else {
181
        hedgingFuture = null;
1✔
182
      }
183

184
      class CommitTask implements Runnable {
1✔
185
        @Override
186
        public void run() {
187
          // For hedging only, not needed for normal retry
188
          for (Substream substream : savedDrainedSubstreams) {
1✔
189
            if (substream != winningSubstream) {
1✔
190
              substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
1✔
191
            }
192
          }
1✔
193
          if (retryFuture != null) {
1✔
194
            retryFuture.cancel(false);
1✔
195
            if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
196
              assert savedCloseMasterListenerReason != null;
×
197
              listenerSerializeExecutor.execute(
×
198
                  new Runnable() {
×
199
                    @Override
200
                    public void run() {
201
                      isClosed = true;
×
202
                      masterListener.closed(savedCloseMasterListenerReason.status,
×
203
                          savedCloseMasterListenerReason.progress,
×
204
                          savedCloseMasterListenerReason.metadata);
×
205
                    }
×
206
                  });
207
            }
208
          }
209

210
          if (hedgingFuture != null) {
1✔
211
            hedgingFuture.cancel(false);
1✔
212
          }
213

214
          postCommit();
1✔
215
        }
1✔
216
      }
217

218
      return new CommitTask();
1✔
219
    }
220
  }
221

222
  abstract void postCommit();
223

224
  /**
225
   * Calls commit() and if successful runs the post commit task. Post commit task will be non-null
226
   * for only once. The post commit task cancels other non-winning streams on separate transport
227
   * threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream
228
   * transports.(issues/10314)
229
   * This method should be called only in subListener callbacks. This guarantees callExecutor
230
   * schedules tasks before master listener closes, which is protected by the inFlightSubStreams
231
   * decorative. That is because:
232
   * For a successful winning stream, other streams won't attempt to close master listener.
233
   * For a cancelled winning stream (noop), other stream won't attempt to close master listener.
234
   * For a failed/closed winning stream, the last closed stream closes the master listener, and
235
   * callExecutor scheduling happens-before that.
236
   */
237
  private void commitAndRun(Substream winningSubstream) {
238
    Runnable postCommitTask = commit(winningSubstream);
1✔
239

240
    if (postCommitTask != null) {
1✔
241
      callExecutor.execute(postCommitTask);
1✔
242
    }
243
  }
1✔
244

245
  // returns null means we should not create new sub streams, e.g. cancelled or
246
  // other close condition is met for retriableStream.
247
  @Nullable
248
  private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry,
249
                                    boolean isHedgedStream) {
250
    int inFlight;
251
    do {
252
      inFlight = inFlightSubStreams.get();
1✔
253
      if (inFlight < 0) {
1✔
254
        return null;
×
255
      }
256
    } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1));
1✔
257
    Substream sub = new Substream(previousAttemptCount);
1✔
258
    // one tracer per substream
259
    final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
1✔
260
    ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
1✔
261
      @Override
262
      public ClientStreamTracer newClientStreamTracer(
263
          ClientStreamTracer.StreamInfo info, Metadata headers) {
264
        return bufferSizeTracer;
1✔
265
      }
266
    };
267

268
    Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
1✔
269
    // NOTICE: This set _must_ be done before stream.start() and it actually is.
270
    sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry,
1✔
271
        isHedgedStream);
272
    return sub;
1✔
273
  }
274

275
  /**
276
   * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
277
   * Client stream is not yet started.
278
   */
279
  abstract ClientStream newSubstream(
280
      Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
281
      boolean isTransparentRetry, boolean isHedgedStream);
282

283
  /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
284
  @VisibleForTesting
285
  final Metadata updateHeaders(
286
      Metadata originalHeaders, int previousAttemptCount) {
287
    Metadata newHeaders = new Metadata();
1✔
288
    newHeaders.merge(originalHeaders);
1✔
289
    if (previousAttemptCount > 0) {
1✔
290
      newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount));
1✔
291
    }
292
    return newHeaders;
1✔
293
  }
294

295
  private void drain(Substream substream) {
296
    int index = 0;
1✔
297
    int chunk = 0x80;
1✔
298
    List<BufferEntry> list = null;
1✔
299
    boolean streamStarted = false;
1✔
300
    Runnable onReadyRunnable = null;
1✔
301

302
    while (true) {
303
      State savedState;
304

305
      synchronized (lock) {
1✔
306
        savedState = state;
1✔
307
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
308
          // committed but not me, to be cancelled
309
          break;
1✔
310
        }
311
        if (savedState.cancelled) {
1✔
312
          break;
1✔
313
        }
314
        if (index == savedState.buffer.size()) { // I'm drained
1✔
315
          state = savedState.substreamDrained(substream);
1✔
316
          if (!isReady()) {
1✔
317
            return;
1✔
318
          }
319
          onReadyRunnable = new Runnable() {
1✔
320
            @Override
321
            public void run() {
322
              if (!isClosed) {
1✔
323
                masterListener.onReady();
1✔
324
              }
325
            }
1✔
326
          };
327
          break;
1✔
328
        }
329

330
        if (substream.closed) {
1✔
331
          return;
×
332
        }
333

334
        int stop = Math.min(index + chunk, savedState.buffer.size());
1✔
335
        if (list == null) {
1✔
336
          list = new ArrayList<>(savedState.buffer.subList(index, stop));
1✔
337
        } else {
338
          list.clear();
1✔
339
          list.addAll(savedState.buffer.subList(index, stop));
1✔
340
        }
341
        index = stop;
1✔
342
      }
1✔
343

344
      for (BufferEntry bufferEntry : list) {
1✔
345
        bufferEntry.runWith(substream);
1✔
346
        if (bufferEntry instanceof RetriableStream.StartEntry) {
1✔
347
          streamStarted = true;
1✔
348
        }
349
        savedState = state;
1✔
350
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
351
          // committed but not me, to be cancelled
352
          break;
1✔
353
        }
354
        if (savedState.cancelled) {
1✔
355
          break;
1✔
356
        }
357
      }
1✔
358
    }
1✔
359

360
    if (onReadyRunnable != null) {
1✔
361
      listenerSerializeExecutor.execute(onReadyRunnable);
1✔
362
      return;
1✔
363
    }
364

365
    if (!streamStarted) {
1✔
366
      // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
367
      substream.stream.start(new Sublistener(substream));
1✔
368
    }
369
    substream.stream.cancel(
1✔
370
        state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
1✔
371
  }
1✔
372

373
  /**
374
   * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
375
   */
376
  @CheckReturnValue
377
  @Nullable
378
  abstract Status prestart();
379

380
  class StartEntry implements BufferEntry {
1✔
381
    @Override
382
    public void runWith(Substream substream) {
383
      substream.stream.start(new Sublistener(substream));
1✔
384
    }
1✔
385
  }
386

387
  /** Starts the first RPC attempt. */
388
  @Override
389
  public final void start(ClientStreamListener listener) {
390
    masterListener = listener;
1✔
391

392
    Status shutdownStatus = prestart();
1✔
393

394
    if (shutdownStatus != null) {
1✔
395
      cancel(shutdownStatus);
1✔
396
      return;
1✔
397
    }
398

399
    synchronized (lock) {
1✔
400
      state.buffer.add(new StartEntry());
1✔
401
    }
1✔
402

403
    Substream substream = createSubstream(0, false, false);
1✔
404
    if (substream == null) {
1✔
405
      return;
×
406
    }
407
    if (isHedging) {
1✔
408
      FutureCanceller scheduledHedgingRef = null;
1✔
409

410
      synchronized (lock) {
1✔
411
        state = state.addActiveHedge(substream);
1✔
412
        if (hasPotentialHedging(state)
1✔
413
            && (throttle == null || throttle.isAboveThreshold())) {
1✔
414
          scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
1✔
415
        }
416
      }
1✔
417

418
      if (scheduledHedgingRef != null) {
1✔
419
        scheduledHedgingRef.setFuture(
1✔
420
            scheduledExecutorService.schedule(
1✔
421
                new HedgingRunnable(scheduledHedgingRef),
422
                hedgingPolicy.hedgingDelayNanos,
423
                TimeUnit.NANOSECONDS));
424
      }
425
    }
426

427
    drain(substream);
1✔
428
  }
1✔
429

430
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
431
  private void pushbackHedging(@Nullable Integer delayMillis) {
432
    if (delayMillis == null) {
1✔
433
      return;
1✔
434
    }
435
    if (delayMillis < 0) {
1✔
436
      freezeHedging();
1✔
437
      return;
1✔
438
    }
439

440
    // Cancels the current scheduledHedging and reschedules a new one.
441
    FutureCanceller future;
442
    Future<?> futureToBeCancelled;
443

444
    synchronized (lock) {
1✔
445
      if (scheduledHedging == null) {
1✔
446
        return;
×
447
      }
448

449
      futureToBeCancelled = scheduledHedging.markCancelled();
1✔
450
      scheduledHedging = future = new FutureCanceller(lock);
1✔
451
    }
1✔
452

453
    if (futureToBeCancelled != null) {
1✔
454
      futureToBeCancelled.cancel(false);
1✔
455
    }
456
    future.setFuture(scheduledExecutorService.schedule(
1✔
457
        new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS));
1✔
458
  }
1✔
459

460
  private final class HedgingRunnable implements Runnable {
461

462
    // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
463
    // by a positive push-back just after newSubstream is instantiated, so that we can double check.
464
    final FutureCanceller scheduledHedgingRef;
465

466
    HedgingRunnable(FutureCanceller scheduledHedging) {
1✔
467
      scheduledHedgingRef = scheduledHedging;
1✔
468
    }
1✔
469

470
    @Override
471
    public void run() {
472
      // It's safe to read state.hedgingAttemptCount here.
473
      // If this run is not cancelled, the value of state.hedgingAttemptCount won't change
474
      // until state.addActiveHedge() is called subsequently, even the state could possibly
475
      // change.
476
      Substream newSubstream = createSubstream(state.hedgingAttemptCount, false, true);
1✔
477
      if (newSubstream == null) {
1✔
478
        return;
×
479
      }
480
      callExecutor.execute(
1✔
481
          new Runnable() {
1✔
482
            @SuppressWarnings("GuardedBy")  //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
483
            @Override
484
            public void run() {
485
              boolean cancelled = false;
1✔
486
              FutureCanceller future = null;
1✔
487

488
              synchronized (lock) {
1✔
489
                if (scheduledHedgingRef.isCancelled()) {
1✔
490
                  cancelled = true;
×
491
                } else {
492
                  state = state.addActiveHedge(newSubstream);
1✔
493
                  if (hasPotentialHedging(state)
1✔
494
                      && (throttle == null || throttle.isAboveThreshold())) {
1✔
495
                    scheduledHedging = future = new FutureCanceller(lock);
1✔
496
                  } else {
497
                    state = state.freezeHedging();
1✔
498
                    scheduledHedging = null;
1✔
499
                  }
500
                }
501
              }
1✔
502

503
              if (cancelled) {
1✔
504
                // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
505
                newSubstream.stream.start(new Sublistener(newSubstream));
×
506
                newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
×
507
                return;
×
508
              }
509
              if (future != null) {
1✔
510
                future.setFuture(
1✔
511
                    scheduledExecutorService.schedule(
1✔
512
                        new HedgingRunnable(future),
513
                        hedgingPolicy.hedgingDelayNanos,
1✔
514
                        TimeUnit.NANOSECONDS));
515
              }
516
              drain(newSubstream);
1✔
517
            }
1✔
518
          });
519
    }
1✔
520
  }
521

522
  @Override
523
  public final void cancel(final Status reason) {
524
    Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
1✔
525
    noopSubstream.stream = new NoopClientStream();
1✔
526
    Runnable runnable = commit(noopSubstream);
1✔
527

528
    if (runnable != null) {
1✔
529
      synchronized (lock) {
1✔
530
        state = state.substreamDrained(noopSubstream);
1✔
531
      }
1✔
532
      runnable.run();
1✔
533
      safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
1✔
534
      return;
1✔
535
    }
536

537
    Substream winningSubstreamToCancel = null;
1✔
538
    synchronized (lock) {
1✔
539
      if (state.drainedSubstreams.contains(state.winningSubstream)) {
1✔
540
        winningSubstreamToCancel = state.winningSubstream;
1✔
541
      } else { // the winningSubstream will be cancelled while draining
542
        cancellationStatus = reason;
1✔
543
      }
544
      state = state.cancelled();
1✔
545
    }
1✔
546
    if (winningSubstreamToCancel != null) {
1✔
547
      winningSubstreamToCancel.stream.cancel(reason);
1✔
548
    }
549
  }
1✔
550

551
  private void delayOrExecute(BufferEntry bufferEntry) {
552
    Collection<Substream> savedDrainedSubstreams;
553
    synchronized (lock) {
1✔
554
      if (!state.passThrough) {
1✔
555
        state.buffer.add(bufferEntry);
1✔
556
      }
557
      savedDrainedSubstreams = state.drainedSubstreams;
1✔
558
    }
1✔
559

560
    for (Substream substream : savedDrainedSubstreams) {
1✔
561
      bufferEntry.runWith(substream);
1✔
562
    }
1✔
563
  }
1✔
564

565
  /**
566
   * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use
567
   * InputStream for buffering.
568
   */
569
  @Override
570
  public final void writeMessage(InputStream message) {
571
    throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
×
572
  }
573

574
  final void sendMessage(final ReqT message) {
575
    State savedState = state;
1✔
576
    if (savedState.passThrough) {
1✔
577
      savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
1✔
578
      return;
1✔
579
    }
580

581
    class SendMessageEntry implements BufferEntry {
1✔
582
      @Override
583
      public void runWith(Substream substream) {
584
        substream.stream.writeMessage(method.streamRequest(message));
1✔
585
        // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
586
        // flushes (or half close), but retry appears to have a code path that the flushes may
587
        // not happen. The code needs to be fixed and this removed. See #9340.
588
        substream.stream.flush();
1✔
589
      }
1✔
590
    }
591

592
    delayOrExecute(new SendMessageEntry());
1✔
593
  }
1✔
594

595
  @Override
596
  public final void request(final int numMessages) {
597
    State savedState = state;
1✔
598
    if (savedState.passThrough) {
1✔
599
      savedState.winningSubstream.stream.request(numMessages);
1✔
600
      return;
1✔
601
    }
602

603
    class RequestEntry implements BufferEntry {
1✔
604
      @Override
605
      public void runWith(Substream substream) {
606
        substream.stream.request(numMessages);
1✔
607
      }
1✔
608
    }
609

610
    delayOrExecute(new RequestEntry());
1✔
611
  }
1✔
612

613
  @Override
614
  public final void flush() {
615
    State savedState = state;
1✔
616
    if (savedState.passThrough) {
1✔
617
      savedState.winningSubstream.stream.flush();
1✔
618
      return;
1✔
619
    }
620

621
    class FlushEntry implements BufferEntry {
1✔
622
      @Override
623
      public void runWith(Substream substream) {
624
        substream.stream.flush();
1✔
625
      }
1✔
626
    }
627

628
    delayOrExecute(new FlushEntry());
1✔
629
  }
1✔
630

631
  @Override
632
  public final boolean isReady() {
633
    for (Substream substream : state.drainedSubstreams) {
1✔
634
      if (substream.stream.isReady()) {
1✔
635
        return true;
1✔
636
      }
637
    }
1✔
638
    return false;
1✔
639
  }
640

641
  @Override
642
  public void optimizeForDirectExecutor() {
643
    class OptimizeDirectEntry implements BufferEntry {
1✔
644
      @Override
645
      public void runWith(Substream substream) {
646
        substream.stream.optimizeForDirectExecutor();
1✔
647
      }
1✔
648
    }
649

650
    delayOrExecute(new OptimizeDirectEntry());
1✔
651
  }
1✔
652

653
  @Override
654
  public final void setCompressor(final Compressor compressor) {
655
    class CompressorEntry implements BufferEntry {
1✔
656
      @Override
657
      public void runWith(Substream substream) {
658
        substream.stream.setCompressor(compressor);
1✔
659
      }
1✔
660
    }
661

662
    delayOrExecute(new CompressorEntry());
1✔
663
  }
1✔
664

665
  @Override
666
  public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
667
    class FullStreamDecompressionEntry implements BufferEntry {
1✔
668
      @Override
669
      public void runWith(Substream substream) {
670
        substream.stream.setFullStreamDecompression(fullStreamDecompression);
1✔
671
      }
1✔
672
    }
673

674
    delayOrExecute(new FullStreamDecompressionEntry());
1✔
675
  }
1✔
676

677
  @Override
678
  public final void setMessageCompression(final boolean enable) {
679
    class MessageCompressionEntry implements BufferEntry {
1✔
680
      @Override
681
      public void runWith(Substream substream) {
682
        substream.stream.setMessageCompression(enable);
1✔
683
      }
1✔
684
    }
685

686
    delayOrExecute(new MessageCompressionEntry());
1✔
687
  }
1✔
688

689
  @Override
690
  public final void halfClose() {
691
    class HalfCloseEntry implements BufferEntry {
1✔
692
      @Override
693
      public void runWith(Substream substream) {
694
        substream.stream.halfClose();
1✔
695
      }
1✔
696
    }
697

698
    delayOrExecute(new HalfCloseEntry());
1✔
699
  }
1✔
700

701
  @Override
702
  public final void setAuthority(final String authority) {
703
    class AuthorityEntry implements BufferEntry {
1✔
704
      @Override
705
      public void runWith(Substream substream) {
706
        substream.stream.setAuthority(authority);
1✔
707
      }
1✔
708
    }
709

710
    delayOrExecute(new AuthorityEntry());
1✔
711
  }
1✔
712

713
  @Override
714
  public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
715
    class DecompressorRegistryEntry implements BufferEntry {
1✔
716
      @Override
717
      public void runWith(Substream substream) {
718
        substream.stream.setDecompressorRegistry(decompressorRegistry);
1✔
719
      }
1✔
720
    }
721

722
    delayOrExecute(new DecompressorRegistryEntry());
1✔
723
  }
1✔
724

725
  @Override
726
  public final void setMaxInboundMessageSize(final int maxSize) {
727
    class MaxInboundMessageSizeEntry implements BufferEntry {
1✔
728
      @Override
729
      public void runWith(Substream substream) {
730
        substream.stream.setMaxInboundMessageSize(maxSize);
1✔
731
      }
1✔
732
    }
733

734
    delayOrExecute(new MaxInboundMessageSizeEntry());
1✔
735
  }
1✔
736

737
  @Override
738
  public final void setMaxOutboundMessageSize(final int maxSize) {
739
    class MaxOutboundMessageSizeEntry implements BufferEntry {
1✔
740
      @Override
741
      public void runWith(Substream substream) {
742
        substream.stream.setMaxOutboundMessageSize(maxSize);
1✔
743
      }
1✔
744
    }
745

746
    delayOrExecute(new MaxOutboundMessageSizeEntry());
1✔
747
  }
1✔
748

749
  @Override
750
  public final void setDeadline(final Deadline deadline) {
751
    class DeadlineEntry implements BufferEntry {
1✔
752
      @Override
753
      public void runWith(Substream substream) {
754
        substream.stream.setDeadline(deadline);
1✔
755
      }
1✔
756
    }
757

758
    delayOrExecute(new DeadlineEntry());
1✔
759
  }
1✔
760

761
  @Override
762
  public final Attributes getAttributes() {
763
    if (state.winningSubstream != null) {
1✔
764
      return state.winningSubstream.stream.getAttributes();
1✔
765
    }
766
    return Attributes.EMPTY;
×
767
  }
768

769
  @Override
770
  public void appendTimeoutInsight(InsightBuilder insight) {
771
    State currentState;
772
    synchronized (lock) {
1✔
773
      insight.appendKeyValue("closed", closedSubstreamsInsight);
1✔
774
      currentState = state;
1✔
775
    }
1✔
776
    if (currentState.winningSubstream != null) {
1✔
777
      // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
778
      // of the winning substream, they may not have received closed() notifications yet, thus they
779
      // may be missing from closedSubstreamsInsight.  This may be a little confusing to the user.
780
      // We need to figure out how to include them.
781
      InsightBuilder substreamInsight = new InsightBuilder();
1✔
782
      currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
1✔
783
      insight.appendKeyValue("committed", substreamInsight);
1✔
784
    } else {
1✔
785
      InsightBuilder openSubstreamsInsight = new InsightBuilder();
1✔
786
      // drainedSubstreams doesn't include all open substreams.  Those which have just been created
787
      // and are still catching up with buffered requests (in other words, still draining) will not
788
      // show up.  We think this is benign, because the draining should be typically fast, and it'd
789
      // be indistinguishable from the case where those streams are to be created a little late due
790
      // to delays in the timer.
791
      for (Substream sub : currentState.drainedSubstreams) {
1✔
792
        InsightBuilder substreamInsight = new InsightBuilder();
1✔
793
        sub.stream.appendTimeoutInsight(substreamInsight);
1✔
794
        openSubstreamsInsight.append(substreamInsight);
1✔
795
      }
1✔
796
      insight.appendKeyValue("open", openSubstreamsInsight);
1✔
797
    }
798
  }
1✔
799

800
  private static Random random = new Random();
1✔
801

802
  @VisibleForTesting
803
  static void setRandom(Random random) {
804
    RetriableStream.random = random;
1✔
805
  }
1✔
806

807
  /**
808
   * Whether there is any potential hedge at the moment. A false return value implies there is
809
   * absolutely no potential hedge. At least one of the hedges will observe a false return value
810
   * when calling this method, unless otherwise the rpc is committed.
811
   */
812
  // only called when isHedging is true
813
  @GuardedBy("lock")
814
  private boolean hasPotentialHedging(State state) {
815
    return state.winningSubstream == null
1✔
816
        && state.hedgingAttemptCount < hedgingPolicy.maxAttempts
817
        && !state.hedgingFrozen;
818
  }
819

820
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
821
  private void freezeHedging() {
822
    Future<?> futureToBeCancelled = null;
1✔
823
    synchronized (lock) {
1✔
824
      if (scheduledHedging != null) {
1✔
825
        futureToBeCancelled = scheduledHedging.markCancelled();
1✔
826
        scheduledHedging = null;
1✔
827
      }
828
      state = state.freezeHedging();
1✔
829
    }
1✔
830

831
    if (futureToBeCancelled != null) {
1✔
832
      futureToBeCancelled.cancel(false);
1✔
833
    }
834
  }
1✔
835

836
  private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
837
    savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
1✔
838
        metadata);
839
    if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
1✔
840
      listenerSerializeExecutor.execute(
1✔
841
          new Runnable() {
1✔
842
            @Override
843
            public void run() {
844
              isClosed = true;
1✔
845
              masterListener.closed(status, progress, metadata);
1✔
846
            }
1✔
847
          });
848
    }
849
  }
1✔
850

851
  private static final boolean isExperimentalRetryJitterEnabled = GrpcUtil
1✔
852
          .getFlag("GRPC_EXPERIMENTAL_XDS_RLS_LB", true);
1✔
853

854
  public static long intervalWithJitter(long intervalNanos) {
855
    double inverseJitterFactor = isExperimentalRetryJitterEnabled
1✔
856
            ? 0.8 * random.nextDouble() + 0.4 : random.nextDouble();
1✔
857
    return (long) (intervalNanos * inverseJitterFactor);
1✔
858
  }
859

860
  private static final class SavedCloseMasterListenerReason {
861
    private final Status status;
862
    private final RpcProgress progress;
863
    private final Metadata metadata;
864

865
    SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
1✔
866
      this.status = status;
1✔
867
      this.progress = progress;
1✔
868
      this.metadata = metadata;
1✔
869
    }
1✔
870
  }
871

872
  private interface BufferEntry {
873
    /** Replays the buffer entry with the given stream. */
874
    void runWith(Substream substream);
875
  }
876

877
  private final class Sublistener implements ClientStreamListener {
1✔
878
    final Substream substream;
879

880
    Sublistener(Substream substream) {
1✔
881
      this.substream = substream;
1✔
882
    }
1✔
883

884
    @Override
885
    public void headersRead(final Metadata headers) {
886
      if (substream.previousAttemptCount > 0) {
1✔
887
        headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
1✔
888
        headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
1✔
889
      }
890
      commitAndRun(substream);
1✔
891
      if (state.winningSubstream == substream) {
1✔
892
        if (throttle != null) {
1✔
893
          throttle.onSuccess();
1✔
894
        }
895
        listenerSerializeExecutor.execute(
1✔
896
            new Runnable() {
1✔
897
              @Override
898
              public void run() {
899
                masterListener.headersRead(headers);
1✔
900
              }
1✔
901
            });
902
      }
903
    }
1✔
904

905
    @Override
906
    public void closed(
907
        final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
908
      synchronized (lock) {
1✔
909
        state = state.substreamClosed(substream);
1✔
910
        closedSubstreamsInsight.append(status.getCode());
1✔
911
      }
1✔
912

913
      if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
914
        assert savedCloseMasterListenerReason != null;
1✔
915
        listenerSerializeExecutor.execute(
1✔
916
            new Runnable() {
1✔
917
              @Override
918
              public void run() {
919
                isClosed = true;
1✔
920
                masterListener.closed(savedCloseMasterListenerReason.status,
1✔
921
                    savedCloseMasterListenerReason.progress,
1✔
922
                    savedCloseMasterListenerReason.metadata);
1✔
923
              }
1✔
924
            });
925
        return;
1✔
926
      }
927

928
      // handle a race between buffer limit exceeded and closed, when setting
929
      // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
930
      if (substream.bufferLimitExceeded) {
1✔
931
        commitAndRun(substream);
1✔
932
        if (state.winningSubstream == substream) {
1✔
933
          safeCloseMasterListener(status, rpcProgress, trailers);
1✔
934
        }
935
        return;
1✔
936
      }
937
      if (rpcProgress == RpcProgress.MISCARRIED
1✔
938
          && localOnlyTransparentRetries.incrementAndGet() > 1_000) {
1✔
939
        commitAndRun(substream);
1✔
940
        if (state.winningSubstream == substream) {
1✔
941
          Status tooManyTransparentRetries = Status.INTERNAL
1✔
942
              .withDescription("Too many transparent retries. Might be a bug in gRPC")
1✔
943
              .withCause(status.asRuntimeException());
1✔
944
          safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
1✔
945
        }
946
        return;
1✔
947
      }
948

949
      if (state.winningSubstream == null) {
1✔
950
        if (rpcProgress == RpcProgress.MISCARRIED
1✔
951
            || (rpcProgress == RpcProgress.REFUSED
952
                && noMoreTransparentRetry.compareAndSet(false, true))) {
1✔
953
          // transparent retry
954
          final Substream newSubstream = createSubstream(substream.previousAttemptCount,
1✔
955
              true, false);
956
          if (newSubstream == null) {
1✔
957
            return;
×
958
          }
959
          if (isHedging) {
1✔
960
            synchronized (lock) {
1✔
961
              // Although this operation is not done atomically with
962
              // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of
963
              // activeHedges, so neither does it affect the commitment decision of other threads,
964
              // nor do the commitment decision making threads affect itself.
965
              state = state.replaceActiveHedge(substream, newSubstream);
1✔
966
            }
1✔
967
          }
968

969
          callExecutor.execute(new Runnable() {
1✔
970
            @Override
971
            public void run() {
972
              drain(newSubstream);
1✔
973
            }
1✔
974
          });
975
          return;
1✔
976
        } else if (rpcProgress == RpcProgress.DROPPED) {
1✔
977
          // For normal retry, nothing need be done here, will just commit.
978
          // For hedging, cancel scheduled hedge that is scheduled prior to the drop
979
          if (isHedging) {
1✔
980
            freezeHedging();
×
981
          }
982
        } else {
983
          noMoreTransparentRetry.set(true);
1✔
984

985
          if (isHedging) {
1✔
986
            HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
1✔
987
            if (hedgingPlan.isHedgeable) {
1✔
988
              pushbackHedging(hedgingPlan.hedgingPushbackMillis);
1✔
989
            }
990
            synchronized (lock) {
1✔
991
              state = state.removeActiveHedge(substream);
1✔
992
              // The invariant is whether or not #(Potential Hedge + active hedges) > 0.
993
              // Once hasPotentialHedging(state) is false, it will always be false, and then
994
              // #(state.activeHedges) will be decreasing. This guarantees that even there may be
995
              // multiple concurrent hedges, one of the hedges will end up committed.
996
              if (hedgingPlan.isHedgeable) {
1✔
997
                if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
1✔
998
                  return;
1✔
999
                }
1000
                // else, no activeHedges, no new hedges possible, try to commit
1001
              } // else, isHedgeable is false, try to commit
1002
            }
1✔
1003
          } else {
1✔
1004
            RetryPlan retryPlan = makeRetryDecision(status, trailers);
1✔
1005
            if (retryPlan.shouldRetry) {
1✔
1006
              // retry
1007
              Substream newSubstream = createSubstream(substream.previousAttemptCount + 1,
1✔
1008
                  false, false);
1009
              if (newSubstream == null) {
1✔
1010
                return;
×
1011
              }
1012
              // The check state.winningSubstream == null, checking if is not already committed, is
1013
              // racy, but is still safe b/c the retry will also handle committed/cancellation
1014
              FutureCanceller scheduledRetryCopy;
1015
              synchronized (lock) {
1✔
1016
                scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
1✔
1017
              }
1✔
1018

1019
              class RetryBackoffRunnable implements Runnable {
1✔
1020
                @Override
1021
                @SuppressWarnings("FutureReturnValueIgnored")
1022
                public void run() {
1023
                  synchronized (scheduledRetryCopy.lock) {
1✔
1024
                    if (scheduledRetryCopy.isCancelled()) {
1✔
1025
                      return;
×
1026
                    } else {
1027
                      scheduledRetryCopy.markCancelled();
1✔
1028
                    }
1029
                  }
1✔
1030

1031
                  callExecutor.execute(
1✔
1032
                      new Runnable() {
1✔
1033
                        @Override
1034
                        public void run() {
1035
                          drain(newSubstream);
1✔
1036
                        }
1✔
1037
                      });
1038
                }
1✔
1039
              }
1040

1041
              scheduledRetryCopy.setFuture(
1✔
1042
                  scheduledExecutorService.schedule(
1✔
1043
                      new RetryBackoffRunnable(),
1044
                      retryPlan.backoffNanos,
1045
                      TimeUnit.NANOSECONDS));
1046
              return;
1✔
1047
            }
1048
          }
1049
        }
1050
      }
1051

1052
      commitAndRun(substream);
1✔
1053
      if (state.winningSubstream == substream) {
1✔
1054
        safeCloseMasterListener(status, rpcProgress, trailers);
1✔
1055
      }
1056
    }
1✔
1057

1058
    /**
1059
     * Decides in current situation whether or not the RPC should retry and if it should retry how
1060
     * long the backoff should be. The decision does not take the commitment status into account, so
1061
     * caller should check it separately. It also updates the throttle. It does not change state.
1062
     */
1063
    private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
1064
      if (retryPolicy == null) {
1✔
1065
        return new RetryPlan(false, 0);
1✔
1066
      }
1067
      boolean shouldRetry = false;
1✔
1068
      long backoffNanos = 0L;
1✔
1069
      boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
1✔
1070
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1071
      boolean isThrottled = false;
1✔
1072
      if (throttle != null) {
1✔
1073
        if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1074
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1075
        }
1076
      }
1077

1078
      if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
1✔
1079
        if (pushbackMillis == null) {
1✔
1080
          if (isRetryableStatusCode) {
1✔
1081
            shouldRetry = true;
1✔
1082
            backoffNanos = intervalWithJitter(nextBackoffIntervalNanos);
1✔
1083
            nextBackoffIntervalNanos = Math.min(
1✔
1084
                (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
1✔
1085
                retryPolicy.maxBackoffNanos);
1✔
1086
          } // else no retry
1087
        } else if (pushbackMillis >= 0) {
1✔
1088
          shouldRetry = true;
1✔
1089
          backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
1✔
1090
          nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
1091
        } // else no retry
1092
      } // else no retry
1093

1094
      return new RetryPlan(shouldRetry, backoffNanos);
1✔
1095
    }
1096

1097
    private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
1098
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1099
      boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
1✔
1100
      boolean isThrottled = false;
1✔
1101
      if (throttle != null) {
1✔
1102
        if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1103
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1104
        }
1105
      }
1106
      if (!isFatal && !isThrottled && !status.isOk()
1✔
1107
          && (pushbackMillis != null && pushbackMillis > 0)) {
1✔
1108
        pushbackMillis = 0; // We want the retry after a nonfatal error to be immediate
1✔
1109
      }
1110
      return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
1✔
1111
    }
1112

1113
    @Nullable
1114
    private Integer getPushbackMills(Metadata trailer) {
1115
      String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
1✔
1116
      Integer pushbackMillis = null;
1✔
1117
      if (pushbackStr != null) {
1✔
1118
        try {
1119
          pushbackMillis = Integer.valueOf(pushbackStr);
1✔
1120
        } catch (NumberFormatException e) {
1✔
1121
          pushbackMillis = -1;
1✔
1122
        }
1✔
1123
      }
1124
      return pushbackMillis;
1✔
1125
    }
1126

1127
    @Override
1128
    public void messagesAvailable(final MessageProducer producer) {
1129
      State savedState = state;
1✔
1130
      checkState(
1✔
1131
          savedState.winningSubstream != null, "Headers should be received prior to messages.");
1132
      if (savedState.winningSubstream != substream) {
1✔
1133
        GrpcUtil.closeQuietly(producer);
1✔
1134
        return;
1✔
1135
      }
1136
      listenerSerializeExecutor.execute(
1✔
1137
          new Runnable() {
1✔
1138
            @Override
1139
            public void run() {
1140
              masterListener.messagesAvailable(producer);
1✔
1141
            }
1✔
1142
          });
1143
    }
1✔
1144

1145
    @Override
1146
    public void onReady() {
1147
      // FIXME(#7089): hedging case is broken.
1148
      if (!isReady()) {
1✔
1149
        return;
1✔
1150
      }
1151
      listenerSerializeExecutor.execute(
1✔
1152
          new Runnable() {
1✔
1153
            @Override
1154
            public void run() {
1155
              if (!isClosed) {
1✔
1156
                masterListener.onReady();
1✔
1157
              }
1158
            }
1✔
1159
          });
1160
    }
1✔
1161
  }
1162

1163
  private static final class State {
1164
    /** Committed and the winning substream drained. */
1165
    final boolean passThrough;
1166

1167
    /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
1168
    @Nullable final List<BufferEntry> buffer;
1169

1170
    /**
1171
     * Unmodifiable collection of all the open substreams that are drained. Singleton once
1172
     * passThrough; Empty if committed but not passTrough.
1173
     */
1174
    final Collection<Substream> drainedSubstreams;
1175

1176
    /**
1177
     * Unmodifiable collection of all the active hedging substreams.
1178
     *
1179
     * <p>A substream even with the attribute substream.closed being true may be considered still
1180
     * "active" at the moment as long as it is in this collection.
1181
     */
1182
    final Collection<Substream> activeHedges; // not null once isHedging = true
1183

1184
    final int hedgingAttemptCount;
1185

1186
    /** Null until committed. */
1187
    @Nullable final Substream winningSubstream;
1188

1189
    /** Not required to set to true when cancelled, but can short-circuit the draining process. */
1190
    final boolean cancelled;
1191

1192
    /** No more hedging due to events like drop or pushback. */
1193
    final boolean hedgingFrozen;
1194

1195
    State(
1196
        @Nullable List<BufferEntry> buffer,
1197
        Collection<Substream> drainedSubstreams,
1198
        Collection<Substream> activeHedges,
1199
        @Nullable Substream winningSubstream,
1200
        boolean cancelled,
1201
        boolean passThrough,
1202
        boolean hedgingFrozen,
1203
        int hedgingAttemptCount) {
1✔
1204
      this.buffer = buffer;
1✔
1205
      this.drainedSubstreams =
1✔
1206
          checkNotNull(drainedSubstreams, "drainedSubstreams");
1✔
1207
      this.winningSubstream = winningSubstream;
1✔
1208
      this.activeHedges = activeHedges;
1✔
1209
      this.cancelled = cancelled;
1✔
1210
      this.passThrough = passThrough;
1✔
1211
      this.hedgingFrozen = hedgingFrozen;
1✔
1212
      this.hedgingAttemptCount = hedgingAttemptCount;
1✔
1213

1214
      checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
1✔
1215
      checkState(
1✔
1216
          !passThrough || winningSubstream != null,
1217
          "passThrough should imply winningSubstream != null");
1218
      checkState(
1✔
1219
          !passThrough
1220
              || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
1✔
1221
              || (drainedSubstreams.size() == 0 && winningSubstream.closed),
1✔
1222
          "passThrough should imply winningSubstream is drained");
1223
      checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
1✔
1224
    }
1✔
1225

1226
    @CheckReturnValue
1227
    // GuardedBy RetriableStream.lock
1228
    State cancelled() {
1229
      return new State(
1✔
1230
          buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough,
1231
          hedgingFrozen, hedgingAttemptCount);
1232
    }
1233

1234
    /** The given substream is drained. */
1235
    @CheckReturnValue
1236
    // GuardedBy RetriableStream.lock
1237
    State substreamDrained(Substream substream) {
1238
      checkState(!passThrough, "Already passThrough");
1✔
1239

1240
      Collection<Substream> drainedSubstreams;
1241
      
1242
      if (substream.closed) {
1✔
1243
        drainedSubstreams = this.drainedSubstreams;
1✔
1244
      } else if (this.drainedSubstreams.isEmpty()) {
1✔
1245
        // optimize for 0-retry, which is most of the cases.
1246
        drainedSubstreams = Collections.singletonList(substream);
1✔
1247
      } else {
1248
        drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1249
        drainedSubstreams.add(substream);
1✔
1250
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1251
      }
1252

1253
      boolean passThrough = winningSubstream != null;
1✔
1254

1255
      List<BufferEntry> buffer = this.buffer;
1✔
1256
      if (passThrough) {
1✔
1257
        checkState(
1✔
1258
            winningSubstream == substream, "Another RPC attempt has already committed");
1259
        buffer = null;
1✔
1260
      }
1261

1262
      return new State(
1✔
1263
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1264
          hedgingFrozen, hedgingAttemptCount);
1265
    }
1266

1267
    /** The given substream is closed. */
1268
    @CheckReturnValue
1269
    // GuardedBy RetriableStream.lock
1270
    State substreamClosed(Substream substream) {
1271
      substream.closed = true;
1✔
1272
      if (this.drainedSubstreams.contains(substream)) {
1✔
1273
        Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1274
        drainedSubstreams.remove(substream);
1✔
1275
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1276
        return new State(
1✔
1277
            buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1278
            hedgingFrozen, hedgingAttemptCount);
1279
      } else {
1280
        return this;
1✔
1281
      }
1282
    }
1283

1284
    @CheckReturnValue
1285
    // GuardedBy RetriableStream.lock
1286
    State committed(Substream winningSubstream) {
1287
      checkState(this.winningSubstream == null, "Already committed");
1✔
1288

1289
      boolean passThrough = false;
1✔
1290
      List<BufferEntry> buffer = this.buffer;
1✔
1291
      Collection<Substream> drainedSubstreams;
1292

1293
      if (this.drainedSubstreams.contains(winningSubstream)) {
1✔
1294
        passThrough = true;
1✔
1295
        buffer = null;
1✔
1296
        drainedSubstreams = Collections.singleton(winningSubstream);
1✔
1297
      } else {
1298
        drainedSubstreams = Collections.emptyList();
1✔
1299
      }
1300

1301
      return new State(
1✔
1302
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1303
          hedgingFrozen, hedgingAttemptCount);
1304
    }
1305

1306
    @CheckReturnValue
1307
    // GuardedBy RetriableStream.lock
1308
    State freezeHedging() {
1309
      if (hedgingFrozen) {
1✔
1310
        return this;
×
1311
      }
1312
      return new State(
1✔
1313
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1314
          true, hedgingAttemptCount);
1315
    }
1316

1317
    @CheckReturnValue
1318
    // GuardedBy RetriableStream.lock
1319
    // state.hedgingAttemptCount is modified only here.
1320
    // The method is only called in RetriableStream.start() and HedgingRunnable.run()
1321
    State addActiveHedge(Substream substream) {
1322
      // hasPotentialHedging must be true
1323
      checkState(!hedgingFrozen, "hedging frozen");
1✔
1324
      checkState(winningSubstream == null, "already committed");
1✔
1325

1326
      Collection<Substream> activeHedges;
1327
      if (this.activeHedges == null) {
1✔
1328
        activeHedges = Collections.singleton(substream);
1✔
1329
      } else {
1330
        activeHedges = new ArrayList<>(this.activeHedges);
1✔
1331
        activeHedges.add(substream);
1✔
1332
        activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1333
      }
1334

1335
      int hedgingAttemptCount = this.hedgingAttemptCount + 1;
1✔
1336
      return new State(
1✔
1337
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1338
          hedgingFrozen, hedgingAttemptCount);
1339
    }
1340

1341
    @CheckReturnValue
1342
    // GuardedBy RetriableStream.lock
1343
    // The method is only called in Sublistener.closed()
1344
    State removeActiveHedge(Substream substream) {
1345
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1346
      activeHedges.remove(substream);
1✔
1347
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1348

1349
      return new State(
1✔
1350
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1351
          hedgingFrozen, hedgingAttemptCount);
1352
    }
1353

1354
    @CheckReturnValue
1355
    // GuardedBy RetriableStream.lock
1356
    // The method is only called for transparent retry.
1357
    State replaceActiveHedge(Substream oldOne, Substream newOne) {
1358
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1359
      activeHedges.remove(oldOne);
1✔
1360
      activeHedges.add(newOne);
1✔
1361
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1362

1363
      return new State(
1✔
1364
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1365
          hedgingFrozen, hedgingAttemptCount);
1366
    }
1367
  }
1368

1369
  /**
1370
   * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
1371
   *  attributes.
1372
   */
1373
  private static final class Substream {
1374
    ClientStream stream;
1375

1376
    // GuardedBy RetriableStream.lock
1377
    boolean closed;
1378

1379
    // setting to true must be GuardedBy RetriableStream.lock
1380
    boolean bufferLimitExceeded;
1381

1382
    final int previousAttemptCount;
1383

1384
    Substream(int previousAttemptCount) {
1✔
1385
      this.previousAttemptCount = previousAttemptCount;
1✔
1386
    }
1✔
1387
  }
1388

1389

1390
  /**
1391
   * Traces the buffer used by a substream.
1392
   */
1393
  class BufferSizeTracer extends ClientStreamTracer {
1394
    // Each buffer size tracer is dedicated to one specific substream.
1395
    private final Substream substream;
1396

1397
    @GuardedBy("lock")
1398
    long bufferNeeded;
1399

1400
    BufferSizeTracer(Substream substream) {
1✔
1401
      this.substream = substream;
1✔
1402
    }
1✔
1403

1404
    /**
1405
     * A message is sent to the wire, so its reference would be released if no retry or
1406
     * hedging were involved. So at this point we have to hold the reference of the message longer
1407
     * for retry, and we need to increment {@code substream.bufferNeeded}.
1408
     */
1409
    @Override
1410
    public void outboundWireSize(long bytes) {
1411
      if (state.winningSubstream != null) {
1✔
1412
        return;
1✔
1413
      }
1414

1415
      Runnable postCommitTask = null;
1✔
1416

1417
      // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
1418
      synchronized (lock) {
1✔
1419
        if (state.winningSubstream != null || substream.closed) {
1✔
1420
          return;
×
1421
        }
1422
        bufferNeeded += bytes;
1✔
1423
        if (bufferNeeded <= perRpcBufferUsed) {
1✔
1424
          return;
1✔
1425
        }
1426

1427
        if (bufferNeeded > perRpcBufferLimit) {
1✔
1428
          substream.bufferLimitExceeded = true;
1✔
1429
        } else {
1430
          // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
1431
          long savedChannelBufferUsed =
1✔
1432
              channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
1✔
1433
          perRpcBufferUsed = bufferNeeded;
1✔
1434

1435
          if (savedChannelBufferUsed > channelBufferLimit) {
1✔
1436
            substream.bufferLimitExceeded = true;
1✔
1437
          }
1438
        }
1439

1440
        if (substream.bufferLimitExceeded) {
1✔
1441
          postCommitTask = commit(substream);
1✔
1442
        }
1443
      }
1✔
1444

1445
      if (postCommitTask != null) {
1✔
1446
        postCommitTask.run();
1✔
1447
      }
1448
    }
1✔
1449
  }
1450

1451
  /**
1452
   *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
1453
   *  the Channel. There should be a single instance of it for each channel.
1454
   */
1455
  static final class ChannelBufferMeter {
1✔
1456
    private final AtomicLong bufferUsed = new AtomicLong();
1✔
1457

1458
    @VisibleForTesting
1459
    long addAndGet(long newBytesUsed) {
1460
      return bufferUsed.addAndGet(newBytesUsed);
1✔
1461
    }
1462
  }
1463

1464
  /**
1465
   * Used for retry throttling.
1466
   */
1467
  static final class Throttle {
1468

1469
    private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
1470

1471
    /**
1472
     * 1000 times the maxTokens field of the retryThrottling policy in service config.
1473
     * The number of tokens starts at maxTokens. The token_count will always be between 0 and
1474
     * maxTokens.
1475
     */
1476
    final int maxTokens;
1477

1478
    /**
1479
     * Half of {@code maxTokens}.
1480
     */
1481
    final int threshold;
1482

1483
    /**
1484
     * 1000 times the tokenRatio field of the retryThrottling policy in service config.
1485
     */
1486
    final int tokenRatio;
1487

1488
    final AtomicInteger tokenCount = new AtomicInteger();
1✔
1489

1490
    Throttle(float maxTokens, float tokenRatio) {
1✔
1491
      // tokenRatio is up to 3 decimal places
1492
      this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1493
      this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1494
      this.threshold = this.maxTokens / 2;
1✔
1495
      tokenCount.set(this.maxTokens);
1✔
1496
    }
1✔
1497

1498
    @VisibleForTesting
1499
    boolean isAboveThreshold() {
1500
      return tokenCount.get() > threshold;
1✔
1501
    }
1502

1503
    /**
1504
     * Counts down the token on qualified failure and checks if it is above the threshold
1505
     * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
1506
     * a not-to-retry pushback.
1507
     */
1508
    @VisibleForTesting
1509
    boolean onQualifiedFailureThenCheckIsAboveThreshold() {
1510
      while (true) {
1511
        int currentCount = tokenCount.get();
1✔
1512
        if (currentCount == 0) {
1✔
1513
          return false;
1✔
1514
        }
1515
        int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1516
        boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
1✔
1517
        if (updated) {
1✔
1518
          return decremented > threshold;
1✔
1519
        }
1520
      }
×
1521
    }
1522

1523
    @VisibleForTesting
1524
    void onSuccess() {
1525
      while (true) {
1526
        int currentCount = tokenCount.get();
1✔
1527
        if (currentCount == maxTokens) {
1✔
1528
          break;
1✔
1529
        }
1530
        int incremented = currentCount + tokenRatio;
1✔
1531
        boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
1✔
1532
        if (updated) {
1✔
1533
          break;
1✔
1534
        }
1535
      }
×
1536
    }
1✔
1537

1538
    @Override
1539
    public boolean equals(Object o) {
1540
      if (this == o) {
1✔
1541
        return true;
×
1542
      }
1543
      if (!(o instanceof Throttle)) {
1✔
1544
        return false;
×
1545
      }
1546
      Throttle that = (Throttle) o;
1✔
1547
      return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
1✔
1548
    }
1549

1550
    @Override
1551
    public int hashCode() {
1552
      return Objects.hashCode(maxTokens, tokenRatio);
×
1553
    }
1554
  }
1555

1556
  private static final class RetryPlan {
1557
    final boolean shouldRetry;
1558
    final long backoffNanos;
1559

1560
    RetryPlan(boolean shouldRetry, long backoffNanos) {
1✔
1561
      this.shouldRetry = shouldRetry;
1✔
1562
      this.backoffNanos = backoffNanos;
1✔
1563
    }
1✔
1564
  }
1565

1566
  private static final class HedgingPlan {
1567
    final boolean isHedgeable;
1568
    @Nullable
1569
    final Integer hedgingPushbackMillis;
1570

1571
    public HedgingPlan(
1572
        boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
1✔
1573
      this.isHedgeable = isHedgeable;
1✔
1574
      this.hedgingPushbackMillis = hedgingPushbackMillis;
1✔
1575
    }
1✔
1576
  }
1577

1578
  /** Allows cancelling a Future without racing with setting the future. */
1579
  private static final class FutureCanceller {
1580

1581
    final Object lock;
1582
    @GuardedBy("lock")
1583
    Future<?> future;
1584
    @GuardedBy("lock")
1585
    boolean cancelled;
1586

1587
    FutureCanceller(Object lock) {
1✔
1588
      this.lock = lock;
1✔
1589
    }
1✔
1590

1591
    void setFuture(Future<?> future) {
1592
      boolean wasCancelled;
1593
      synchronized (lock) {
1✔
1594
        wasCancelled = cancelled;
1✔
1595
        if (!wasCancelled) {
1✔
1596
          this.future = future;
1✔
1597
        }
1598
      }
1✔
1599
      if (wasCancelled) {
1✔
1600
        future.cancel(false);
×
1601
      }
1602
    }
1✔
1603

1604
    @GuardedBy("lock")
1605
    @CheckForNull // Must cancel the returned future if not null.
1606
    Future<?> markCancelled() {
1607
      cancelled = true;
1✔
1608
      return future;
1✔
1609
    }
1610

1611
    @GuardedBy("lock")
1612
    boolean isCancelled() {
1613
      return cancelled;
1✔
1614
    }
1615
  }
1616
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc