• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19155

08 Apr 2024 06:30PM CUT coverage: 88.25% (+0.01%) from 88.24%
#19155

push

github

web-flow
buildscripts: Migrate PSM Interop to Artifact Registry (#11079) (#11091)

From Container Registry (gcr.io) to Artifact Registry (pkg.dev).

30319 of 34356 relevant lines covered (88.25%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.41
/../core/src/main/java/io/grpc/internal/RetriableStream.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Objects;
25
import io.grpc.Attributes;
26
import io.grpc.ClientStreamTracer;
27
import io.grpc.Compressor;
28
import io.grpc.Deadline;
29
import io.grpc.DecompressorRegistry;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.internal.ClientStreamListener.RpcProgress;
35
import java.io.InputStream;
36
import java.lang.Thread.UncaughtExceptionHandler;
37
import java.util.ArrayList;
38
import java.util.Collection;
39
import java.util.Collections;
40
import java.util.List;
41
import java.util.Random;
42
import java.util.concurrent.Executor;
43
import java.util.concurrent.Future;
44
import java.util.concurrent.ScheduledExecutorService;
45
import java.util.concurrent.TimeUnit;
46
import java.util.concurrent.atomic.AtomicBoolean;
47
import java.util.concurrent.atomic.AtomicInteger;
48
import java.util.concurrent.atomic.AtomicLong;
49
import javax.annotation.CheckForNull;
50
import javax.annotation.CheckReturnValue;
51
import javax.annotation.Nullable;
52
import javax.annotation.concurrent.GuardedBy;
53

54
/** A logical {@link ClientStream} that is retriable. */
55
abstract class RetriableStream<ReqT> implements ClientStream {
56
  @VisibleForTesting
57
  static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
1✔
58
      Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
1✔
59

60
  @VisibleForTesting
61
  static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
1✔
62
      Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
1✔
63

64
  private static final Status CANCELLED_BECAUSE_COMMITTED =
1✔
65
      Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
1✔
66

67
  private final MethodDescriptor<ReqT, ?> method;
68
  private final Executor callExecutor;
69
  private final Executor listenerSerializeExecutor = new SynchronizationContext(
1✔
70
      new UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          throw Status.fromThrowable(e)
×
74
              .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
×
75
              .asRuntimeException();
×
76
        }
77
      }
78
  );
79
  private final ScheduledExecutorService scheduledExecutorService;
80
  // Must not modify it.
81
  private final Metadata headers;
82
  @Nullable
83
  private final RetryPolicy retryPolicy;
84
  @Nullable
85
  private final HedgingPolicy hedgingPolicy;
86
  private final boolean isHedging;
87

88
  /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
89
  private final Object lock = new Object();
1✔
90

91
  private final ChannelBufferMeter channelBufferUsed;
92
  private final long perRpcBufferLimit;
93
  private final long channelBufferLimit;
94
  @Nullable
95
  private final Throttle throttle;
96
  @GuardedBy("lock")
1✔
97
  private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
98

99
  private volatile State state = new State(
1✔
100
      new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
1✔
101
      false, 0);
102

103
  /**
104
   * Either non-local transparent retry happened or reached server's application logic.
105
   *
106
   * <p>Note that local-only transparent retries are unlimited.
107
   */
108
  private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
1✔
109
  private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
1✔
110
  private final AtomicInteger inFlightSubStreams = new AtomicInteger();
1✔
111
  private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112

113
  // Used for recording the share of buffer used for the current call out of the channel buffer.
114
  // This field would not be necessary if there is no channel buffer limit.
115
  @GuardedBy("lock")
116
  private long perRpcBufferUsed;
117

118
  private ClientStreamListener masterListener;
119
  @GuardedBy("lock")
120
  private FutureCanceller scheduledRetry;
121
  @GuardedBy("lock")
122
  private FutureCanceller scheduledHedging;
123
  private long nextBackoffIntervalNanos;
124
  private Status cancellationStatus;
125
  private boolean isClosed;
126

127
  RetriableStream(
128
      MethodDescriptor<ReqT, ?> method, Metadata headers,
129
      ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
130
      Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
131
      @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy,
132
      @Nullable Throttle throttle) {
1✔
133
    this.method = method;
1✔
134
    this.channelBufferUsed = channelBufferUsed;
1✔
135
    this.perRpcBufferLimit = perRpcBufferLimit;
1✔
136
    this.channelBufferLimit = channelBufferLimit;
1✔
137
    this.callExecutor = callExecutor;
1✔
138
    this.scheduledExecutorService = scheduledExecutorService;
1✔
139
    this.headers = headers;
1✔
140
    this.retryPolicy = retryPolicy;
1✔
141
    if (retryPolicy != null) {
1✔
142
      this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
143
    }
144
    this.hedgingPolicy = hedgingPolicy;
1✔
145
    checkArgument(
1✔
146
        retryPolicy == null || hedgingPolicy == null,
147
        "Should not provide both retryPolicy and hedgingPolicy");
148
    this.isHedging = hedgingPolicy != null;
1✔
149
    this.throttle = throttle;
1✔
150
  }
1✔
151

152
  @SuppressWarnings("GuardedBy")
153
  @Nullable // null if already committed
154
  @CheckReturnValue
155
  private Runnable commit(final Substream winningSubstream) {
156

157
    synchronized (lock) {
1✔
158
      if (state.winningSubstream != null) {
1✔
159
        return null;
1✔
160
      }
161
      final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
1✔
162

163
      state = state.committed(winningSubstream);
1✔
164

165
      // subtract the share of this RPC from channelBufferUsed.
166
      channelBufferUsed.addAndGet(-perRpcBufferUsed);
1✔
167

168
      final Future<?> retryFuture;
169
      if (scheduledRetry != null) {
1✔
170
        // TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead
171
        // found: 'this.lock'
172
        retryFuture = scheduledRetry.markCancelled();
1✔
173
        scheduledRetry = null;
1✔
174
      } else {
175
        retryFuture = null;
1✔
176
      }
177
      // cancel the scheduled hedging if it is scheduled prior to the commitment
178
      final Future<?> hedgingFuture;
179
      if (scheduledHedging != null) {
1✔
180
        // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
181
        // found: 'this.lock'
182
        hedgingFuture = scheduledHedging.markCancelled();
1✔
183
        scheduledHedging = null;
1✔
184
      } else {
185
        hedgingFuture = null;
1✔
186
      }
187

188
      class CommitTask implements Runnable {
1✔
189
        @Override
190
        public void run() {
191
          // For hedging only, not needed for normal retry
192
          for (Substream substream : savedDrainedSubstreams) {
1✔
193
            if (substream != winningSubstream) {
1✔
194
              substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
1✔
195
            }
196
          }
1✔
197
          if (retryFuture != null) {
1✔
198
            boolean cancelled = retryFuture.cancel(false);
1✔
199
            if (cancelled) {
1✔
200
              inFlightSubStreams.decrementAndGet();
1✔
201
            }
202
          }
203
          if (hedgingFuture != null) {
1✔
204
            hedgingFuture.cancel(false);
1✔
205
          }
206

207
          postCommit();
1✔
208
        }
1✔
209
      }
210

211
      return new CommitTask();
1✔
212
    }
213
  }
214

215
  abstract void postCommit();
216

217
  /**
218
   * Calls commit() and if successful runs the post commit task. Post commit task will be non-null
219
   * for only once. The post commit task cancels other non-winning streams on separate transport
220
   * threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream
221
   * transports.(issues/10314)
222
   * This method should be called only in subListener callbacks. This guarantees callExecutor
223
   * schedules tasks before master listener closes, which is protected by the inFlightSubStreams
224
   * decorative. That is because:
225
   * For a successful winning stream, other streams won't attempt to close master listener.
226
   * For a cancelled winning stream (noop), other stream won't attempt to close master listener.
227
   * For a failed/closed winning stream, the last closed stream closes the master listener, and
228
   * callExecutor scheduling happens-before that.
229
   */
230
  private void commitAndRun(Substream winningSubstream) {
231
    Runnable postCommitTask = commit(winningSubstream);
1✔
232

233
    if (postCommitTask != null) {
1✔
234
      callExecutor.execute(postCommitTask);
1✔
235
    }
236
  }
1✔
237

238
  // returns null means we should not create new sub streams, e.g. cancelled or
239
  // other close condition is met for retriableStream.
240
  @Nullable
241
  private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
242
    int inFlight;
243
    do {
244
      inFlight = inFlightSubStreams.get();
1✔
245
      if (inFlight < 0) {
1✔
246
        return null;
×
247
      }
248
    } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1));
1✔
249
    Substream sub = new Substream(previousAttemptCount);
1✔
250
    // one tracer per substream
251
    final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
1✔
252
    ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
1✔
253
      @Override
254
      public ClientStreamTracer newClientStreamTracer(
255
          ClientStreamTracer.StreamInfo info, Metadata headers) {
256
        return bufferSizeTracer;
1✔
257
      }
258
    };
259

260
    Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
1✔
261
    // NOTICE: This set _must_ be done before stream.start() and it actually is.
262
    sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry);
1✔
263
    return sub;
1✔
264
  }
265

266
  /**
267
   * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
268
   * Client stream is not yet started.
269
   */
270
  abstract ClientStream newSubstream(
271
      Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
272
      boolean isTransparentRetry);
273

274
  /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
275
  @VisibleForTesting
276
  final Metadata updateHeaders(
277
      Metadata originalHeaders, int previousAttemptCount) {
278
    Metadata newHeaders = new Metadata();
1✔
279
    newHeaders.merge(originalHeaders);
1✔
280
    if (previousAttemptCount > 0) {
1✔
281
      newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount));
1✔
282
    }
283
    return newHeaders;
1✔
284
  }
285

286
  private void drain(Substream substream) {
287
    int index = 0;
1✔
288
    int chunk = 0x80;
1✔
289
    List<BufferEntry> list = null;
1✔
290
    boolean streamStarted = false;
1✔
291
    Runnable onReadyRunnable = null;
1✔
292

293
    while (true) {
294
      State savedState;
295

296
      synchronized (lock) {
1✔
297
        savedState = state;
1✔
298
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
299
          // committed but not me, to be cancelled
300
          break;
1✔
301
        }
302
        if (savedState.cancelled) {
1✔
303
          break;
1✔
304
        }
305
        if (index == savedState.buffer.size()) { // I'm drained
1✔
306
          state = savedState.substreamDrained(substream);
1✔
307
          if (!isReady()) {
1✔
308
            return;
1✔
309
          }
310
          onReadyRunnable = new Runnable() {
1✔
311
            @Override
312
            public void run() {
313
              if (!isClosed) {
1✔
314
                masterListener.onReady();
1✔
315
              }
316
            }
1✔
317
          };
318
          break;
1✔
319
        }
320

321
        if (substream.closed) {
1✔
322
          return;
×
323
        }
324

325
        int stop = Math.min(index + chunk, savedState.buffer.size());
1✔
326
        if (list == null) {
1✔
327
          list = new ArrayList<>(savedState.buffer.subList(index, stop));
1✔
328
        } else {
329
          list.clear();
1✔
330
          list.addAll(savedState.buffer.subList(index, stop));
1✔
331
        }
332
        index = stop;
1✔
333
      }
1✔
334

335
      for (BufferEntry bufferEntry : list) {
1✔
336
        bufferEntry.runWith(substream);
1✔
337
        if (bufferEntry instanceof RetriableStream.StartEntry) {
1✔
338
          streamStarted = true;
1✔
339
        }
340
        savedState = state;
1✔
341
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
342
          // committed but not me, to be cancelled
343
          break;
1✔
344
        }
345
        if (savedState.cancelled) {
1✔
346
          break;
1✔
347
        }
348
      }
1✔
349
    }
1✔
350

351
    if (onReadyRunnable != null) {
1✔
352
      listenerSerializeExecutor.execute(onReadyRunnable);
1✔
353
      return;
1✔
354
    }
355

356
    if (!streamStarted) {
1✔
357
      // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
358
      substream.stream.start(new Sublistener(substream));
1✔
359
    }
360
    substream.stream.cancel(
1✔
361
        state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
1✔
362
  }
1✔
363

364
  /**
365
   * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
366
   */
367
  @CheckReturnValue
368
  @Nullable
369
  abstract Status prestart();
370

371
  class StartEntry implements BufferEntry {
1✔
372
    @Override
373
    public void runWith(Substream substream) {
374
      substream.stream.start(new Sublistener(substream));
1✔
375
    }
1✔
376
  }
377

378
  /** Starts the first PRC attempt. */
379
  @Override
380
  public final void start(ClientStreamListener listener) {
381
    masterListener = listener;
1✔
382

383
    Status shutdownStatus = prestart();
1✔
384

385
    if (shutdownStatus != null) {
1✔
386
      cancel(shutdownStatus);
1✔
387
      return;
1✔
388
    }
389

390
    synchronized (lock) {
1✔
391
      state.buffer.add(new StartEntry());
1✔
392
    }
1✔
393

394
    Substream substream = createSubstream(0, false);
1✔
395
    if (substream == null) {
1✔
396
      return;
×
397
    }
398
    if (isHedging) {
1✔
399
      FutureCanceller scheduledHedgingRef = null;
1✔
400

401
      synchronized (lock) {
1✔
402
        state = state.addActiveHedge(substream);
1✔
403
        if (hasPotentialHedging(state)
1✔
404
            && (throttle == null || throttle.isAboveThreshold())) {
1✔
405
          scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
1✔
406
        }
407
      }
1✔
408

409
      if (scheduledHedgingRef != null) {
1✔
410
        scheduledHedgingRef.setFuture(
1✔
411
            scheduledExecutorService.schedule(
1✔
412
                new HedgingRunnable(scheduledHedgingRef),
413
                hedgingPolicy.hedgingDelayNanos,
414
                TimeUnit.NANOSECONDS));
415
      }
416
    }
417

418
    drain(substream);
1✔
419
  }
1✔
420

421
  @SuppressWarnings("GuardedBy")
422
  private void pushbackHedging(@Nullable Integer delayMillis) {
423
    if (delayMillis == null) {
1✔
424
      return;
1✔
425
    }
426
    if (delayMillis < 0) {
1✔
427
      freezeHedging();
1✔
428
      return;
1✔
429
    }
430

431
    // Cancels the current scheduledHedging and reschedules a new one.
432
    FutureCanceller future;
433
    Future<?> futureToBeCancelled;
434

435
    synchronized (lock) {
1✔
436
      if (scheduledHedging == null) {
1✔
437
        return;
×
438
      }
439

440
      // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
441
      // found: 'this.lock'
442
      futureToBeCancelled = scheduledHedging.markCancelled();
1✔
443
      scheduledHedging = future = new FutureCanceller(lock);
1✔
444
    }
1✔
445

446
    if (futureToBeCancelled != null) {
1✔
447
      futureToBeCancelled.cancel(false);
1✔
448
    }
449
    future.setFuture(scheduledExecutorService.schedule(
1✔
450
        new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS));
1✔
451
  }
1✔
452

453
  private final class HedgingRunnable implements Runnable {
454

455
    // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
456
    // by a positive push-back just after newSubstream is instantiated, so that we can double check.
457
    final FutureCanceller scheduledHedgingRef;
458

459
    HedgingRunnable(FutureCanceller scheduledHedging) {
1✔
460
      scheduledHedgingRef = scheduledHedging;
1✔
461
    }
1✔
462

463
    @Override
464
    public void run() {
465
      // It's safe to read state.hedgingAttemptCount here.
466
      // If this run is not cancelled, the value of state.hedgingAttemptCount won't change
467
      // until state.addActiveHedge() is called subsequently, even the state could possibly
468
      // change.
469
      Substream newSubstream = createSubstream(state.hedgingAttemptCount, false);
1✔
470
      if (newSubstream == null) {
1✔
471
        return;
×
472
      }
473
      callExecutor.execute(
1✔
474
          new Runnable() {
1✔
475
            @SuppressWarnings("GuardedBy")
476
            @Override
477
            public void run() {
478
              boolean cancelled = false;
1✔
479
              FutureCanceller future = null;
1✔
480

481
              synchronized (lock) {
1✔
482
                // TODO(b/145386688): This access should be guarded by
483
                // 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found:
484
                // 'RetriableStream.this.lock'
485
                if (scheduledHedgingRef.isCancelled()) {
1✔
486
                  cancelled = true;
×
487
                } else {
488
                  state = state.addActiveHedge(newSubstream);
1✔
489
                  if (hasPotentialHedging(state)
1✔
490
                      && (throttle == null || throttle.isAboveThreshold())) {
1✔
491
                    scheduledHedging = future = new FutureCanceller(lock);
1✔
492
                  } else {
493
                    state = state.freezeHedging();
1✔
494
                    scheduledHedging = null;
1✔
495
                  }
496
                }
497
              }
1✔
498

499
              if (cancelled) {
1✔
500
                // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
501
                newSubstream.stream.start(new Sublistener(newSubstream));
×
502
                newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
×
503
                return;
×
504
              }
505
              if (future != null) {
1✔
506
                future.setFuture(
1✔
507
                    scheduledExecutorService.schedule(
1✔
508
                        new HedgingRunnable(future),
509
                        hedgingPolicy.hedgingDelayNanos,
1✔
510
                        TimeUnit.NANOSECONDS));
511
              }
512
              drain(newSubstream);
1✔
513
            }
1✔
514
          });
515
    }
1✔
516
  }
517

518
  @Override
519
  public final void cancel(final Status reason) {
520
    Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
1✔
521
    noopSubstream.stream = new NoopClientStream();
1✔
522
    Runnable runnable = commit(noopSubstream);
1✔
523

524
    if (runnable != null) {
1✔
525
      synchronized (lock) {
1✔
526
        state = state.substreamDrained(noopSubstream);
1✔
527
      }
1✔
528
      runnable.run();
1✔
529
      safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
1✔
530
      return;
1✔
531
    }
532

533
    Substream winningSubstreamToCancel = null;
1✔
534
    synchronized (lock) {
1✔
535
      if (state.drainedSubstreams.contains(state.winningSubstream)) {
1✔
536
        winningSubstreamToCancel = state.winningSubstream;
1✔
537
      } else { // the winningSubstream will be cancelled while draining
538
        cancellationStatus = reason;
1✔
539
      }
540
      state = state.cancelled();
1✔
541
    }
1✔
542
    if (winningSubstreamToCancel != null) {
1✔
543
      winningSubstreamToCancel.stream.cancel(reason);
1✔
544
    }
545
  }
1✔
546

547
  private void delayOrExecute(BufferEntry bufferEntry) {
548
    Collection<Substream> savedDrainedSubstreams;
549
    synchronized (lock) {
1✔
550
      if (!state.passThrough) {
1✔
551
        state.buffer.add(bufferEntry);
1✔
552
      }
553
      savedDrainedSubstreams = state.drainedSubstreams;
1✔
554
    }
1✔
555

556
    for (Substream substream : savedDrainedSubstreams) {
1✔
557
      bufferEntry.runWith(substream);
1✔
558
    }
1✔
559
  }
1✔
560

561
  /**
562
   * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use
563
   * InputStream for buffering.
564
   */
565
  @Override
566
  public final void writeMessage(InputStream message) {
567
    throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
×
568
  }
569

570
  final void sendMessage(final ReqT message) {
571
    State savedState = state;
1✔
572
    if (savedState.passThrough) {
1✔
573
      savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
1✔
574
      return;
1✔
575
    }
576

577
    class SendMessageEntry implements BufferEntry {
1✔
578
      @Override
579
      public void runWith(Substream substream) {
580
        substream.stream.writeMessage(method.streamRequest(message));
1✔
581
        // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
582
        // flushes (or half close), but retry appears to have a code path that the flushes may
583
        // not happen. The code needs to be fixed and this removed. See #9340.
584
        substream.stream.flush();
1✔
585
      }
1✔
586
    }
587

588
    delayOrExecute(new SendMessageEntry());
1✔
589
  }
1✔
590

591
  @Override
592
  public final void request(final int numMessages) {
593
    State savedState = state;
1✔
594
    if (savedState.passThrough) {
1✔
595
      savedState.winningSubstream.stream.request(numMessages);
1✔
596
      return;
1✔
597
    }
598

599
    class RequestEntry implements BufferEntry {
1✔
600
      @Override
601
      public void runWith(Substream substream) {
602
        substream.stream.request(numMessages);
1✔
603
      }
1✔
604
    }
605

606
    delayOrExecute(new RequestEntry());
1✔
607
  }
1✔
608

609
  @Override
610
  public final void flush() {
611
    State savedState = state;
1✔
612
    if (savedState.passThrough) {
1✔
613
      savedState.winningSubstream.stream.flush();
1✔
614
      return;
1✔
615
    }
616

617
    class FlushEntry implements BufferEntry {
1✔
618
      @Override
619
      public void runWith(Substream substream) {
620
        substream.stream.flush();
1✔
621
      }
1✔
622
    }
623

624
    delayOrExecute(new FlushEntry());
1✔
625
  }
1✔
626

627
  @Override
628
  public final boolean isReady() {
629
    for (Substream substream : state.drainedSubstreams) {
1✔
630
      if (substream.stream.isReady()) {
1✔
631
        return true;
1✔
632
      }
633
    }
1✔
634
    return false;
1✔
635
  }
636

637
  @Override
638
  public void optimizeForDirectExecutor() {
639
    class OptimizeDirectEntry implements BufferEntry {
1✔
640
      @Override
641
      public void runWith(Substream substream) {
642
        substream.stream.optimizeForDirectExecutor();
1✔
643
      }
1✔
644
    }
645

646
    delayOrExecute(new OptimizeDirectEntry());
1✔
647
  }
1✔
648

649
  @Override
650
  public final void setCompressor(final Compressor compressor) {
651
    class CompressorEntry implements BufferEntry {
1✔
652
      @Override
653
      public void runWith(Substream substream) {
654
        substream.stream.setCompressor(compressor);
1✔
655
      }
1✔
656
    }
657

658
    delayOrExecute(new CompressorEntry());
1✔
659
  }
1✔
660

661
  @Override
662
  public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
663
    class FullStreamDecompressionEntry implements BufferEntry {
1✔
664
      @Override
665
      public void runWith(Substream substream) {
666
        substream.stream.setFullStreamDecompression(fullStreamDecompression);
1✔
667
      }
1✔
668
    }
669

670
    delayOrExecute(new FullStreamDecompressionEntry());
1✔
671
  }
1✔
672

673
  @Override
674
  public final void setMessageCompression(final boolean enable) {
675
    class MessageCompressionEntry implements BufferEntry {
1✔
676
      @Override
677
      public void runWith(Substream substream) {
678
        substream.stream.setMessageCompression(enable);
1✔
679
      }
1✔
680
    }
681

682
    delayOrExecute(new MessageCompressionEntry());
1✔
683
  }
1✔
684

685
  @Override
686
  public final void halfClose() {
687
    class HalfCloseEntry implements BufferEntry {
1✔
688
      @Override
689
      public void runWith(Substream substream) {
690
        substream.stream.halfClose();
1✔
691
      }
1✔
692
    }
693

694
    delayOrExecute(new HalfCloseEntry());
1✔
695
  }
1✔
696

697
  @Override
698
  public final void setAuthority(final String authority) {
699
    class AuthorityEntry implements BufferEntry {
1✔
700
      @Override
701
      public void runWith(Substream substream) {
702
        substream.stream.setAuthority(authority);
1✔
703
      }
1✔
704
    }
705

706
    delayOrExecute(new AuthorityEntry());
1✔
707
  }
1✔
708

709
  @Override
710
  public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
711
    class DecompressorRegistryEntry implements BufferEntry {
1✔
712
      @Override
713
      public void runWith(Substream substream) {
714
        substream.stream.setDecompressorRegistry(decompressorRegistry);
1✔
715
      }
1✔
716
    }
717

718
    delayOrExecute(new DecompressorRegistryEntry());
1✔
719
  }
1✔
720

721
  @Override
722
  public final void setMaxInboundMessageSize(final int maxSize) {
723
    class MaxInboundMessageSizeEntry implements BufferEntry {
1✔
724
      @Override
725
      public void runWith(Substream substream) {
726
        substream.stream.setMaxInboundMessageSize(maxSize);
1✔
727
      }
1✔
728
    }
729

730
    delayOrExecute(new MaxInboundMessageSizeEntry());
1✔
731
  }
1✔
732

733
  @Override
734
  public final void setMaxOutboundMessageSize(final int maxSize) {
735
    class MaxOutboundMessageSizeEntry implements BufferEntry {
1✔
736
      @Override
737
      public void runWith(Substream substream) {
738
        substream.stream.setMaxOutboundMessageSize(maxSize);
1✔
739
      }
1✔
740
    }
741

742
    delayOrExecute(new MaxOutboundMessageSizeEntry());
1✔
743
  }
1✔
744

745
  @Override
746
  public final void setDeadline(final Deadline deadline) {
747
    class DeadlineEntry implements BufferEntry {
1✔
748
      @Override
749
      public void runWith(Substream substream) {
750
        substream.stream.setDeadline(deadline);
1✔
751
      }
1✔
752
    }
753

754
    delayOrExecute(new DeadlineEntry());
1✔
755
  }
1✔
756

757
  @Override
758
  public final Attributes getAttributes() {
759
    if (state.winningSubstream != null) {
1✔
760
      return state.winningSubstream.stream.getAttributes();
1✔
761
    }
762
    return Attributes.EMPTY;
×
763
  }
764

765
  @Override
766
  public void appendTimeoutInsight(InsightBuilder insight) {
767
    State currentState;
768
    synchronized (lock) {
1✔
769
      insight.appendKeyValue("closed", closedSubstreamsInsight);
1✔
770
      currentState = state;
1✔
771
    }
1✔
772
    if (currentState.winningSubstream != null) {
1✔
773
      // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
774
      // of the winning substream, they may not have received closed() notifications yet, thus they
775
      // may be missing from closedSubstreamsInsight.  This may be a little confusing to the user.
776
      // We need to figure out how to include them.
777
      InsightBuilder substreamInsight = new InsightBuilder();
1✔
778
      currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
1✔
779
      insight.appendKeyValue("committed", substreamInsight);
1✔
780
    } else {
1✔
781
      InsightBuilder openSubstreamsInsight = new InsightBuilder();
1✔
782
      // drainedSubstreams doesn't include all open substreams.  Those which have just been created
783
      // and are still catching up with buffered requests (in other words, still draining) will not
784
      // show up.  We think this is benign, because the draining should be typically fast, and it'd
785
      // be indistinguishable from the case where those streams are to be created a little late due
786
      // to delays in the timer.
787
      for (Substream sub : currentState.drainedSubstreams) {
1✔
788
        InsightBuilder substreamInsight = new InsightBuilder();
1✔
789
        sub.stream.appendTimeoutInsight(substreamInsight);
1✔
790
        openSubstreamsInsight.append(substreamInsight);
1✔
791
      }
1✔
792
      insight.appendKeyValue("open", openSubstreamsInsight);
1✔
793
    }
794
  }
1✔
795

796
  private static Random random = new Random();
1✔
797

798
  @VisibleForTesting
799
  static void setRandom(Random random) {
800
    RetriableStream.random = random;
1✔
801
  }
1✔
802

803
  /**
804
   * Whether there is any potential hedge at the moment. A false return value implies there is
805
   * absolutely no potential hedge. At least one of the hedges will observe a false return value
806
   * when calling this method, unless otherwise the rpc is committed.
807
   */
808
  // only called when isHedging is true
809
  @GuardedBy("lock")
810
  private boolean hasPotentialHedging(State state) {
811
    return state.winningSubstream == null
1✔
812
        && state.hedgingAttemptCount < hedgingPolicy.maxAttempts
813
        && !state.hedgingFrozen;
814
  }
815

816
  @SuppressWarnings("GuardedBy")
817
  private void freezeHedging() {
818
    Future<?> futureToBeCancelled = null;
1✔
819
    synchronized (lock) {
1✔
820
      if (scheduledHedging != null) {
1✔
821
        // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
822
        // found: 'this.lock'
823
        futureToBeCancelled = scheduledHedging.markCancelled();
1✔
824
        scheduledHedging = null;
1✔
825
      }
826
      state = state.freezeHedging();
1✔
827
    }
1✔
828

829
    if (futureToBeCancelled != null) {
1✔
830
      futureToBeCancelled.cancel(false);
1✔
831
    }
832
  }
1✔
833

834
  private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
835
    savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
1✔
836
        metadata);
837
    if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
1✔
838
      listenerSerializeExecutor.execute(
1✔
839
          new Runnable() {
1✔
840
            @Override
841
            public void run() {
842
              isClosed = true;
1✔
843
              masterListener.closed(status, progress, metadata);
1✔
844
            }
1✔
845
          });
846
    }
847
  }
1✔
848

849
  private static final class SavedCloseMasterListenerReason {
850
    private final Status status;
851
    private final RpcProgress progress;
852
    private final Metadata metadata;
853

854
    SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
1✔
855
      this.status = status;
1✔
856
      this.progress = progress;
1✔
857
      this.metadata = metadata;
1✔
858
    }
1✔
859
  }
860

861
  private interface BufferEntry {
862
    /** Replays the buffer entry with the given stream. */
863
    void runWith(Substream substream);
864
  }
865

866
  private final class Sublistener implements ClientStreamListener {
1✔
867
    final Substream substream;
868

869
    Sublistener(Substream substream) {
1✔
870
      this.substream = substream;
1✔
871
    }
1✔
872

873
    @Override
874
    public void headersRead(final Metadata headers) {
875
      if (substream.previousAttemptCount > 0) {
1✔
876
        headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
1✔
877
        headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
1✔
878
      }
879
      commitAndRun(substream);
1✔
880
      if (state.winningSubstream == substream) {
1✔
881
        if (throttle != null) {
1✔
882
          throttle.onSuccess();
1✔
883
        }
884
        listenerSerializeExecutor.execute(
1✔
885
            new Runnable() {
1✔
886
              @Override
887
              public void run() {
888
                masterListener.headersRead(headers);
1✔
889
              }
1✔
890
            });
891
      }
892
    }
1✔
893

894
    @Override
895
    public void closed(
896
        final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
897
      synchronized (lock) {
1✔
898
        state = state.substreamClosed(substream);
1✔
899
        closedSubstreamsInsight.append(status.getCode());
1✔
900
      }
1✔
901

902
      if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
903
        assert savedCloseMasterListenerReason != null;
1✔
904
        listenerSerializeExecutor.execute(
1✔
905
            new Runnable() {
1✔
906
              @Override
907
              public void run() {
908
                isClosed = true;
1✔
909
                masterListener.closed(savedCloseMasterListenerReason.status,
1✔
910
                    savedCloseMasterListenerReason.progress,
1✔
911
                    savedCloseMasterListenerReason.metadata);
1✔
912
              }
1✔
913
            });
914
        return;
1✔
915
      }
916

917
      // handle a race between buffer limit exceeded and closed, when setting
918
      // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
919
      if (substream.bufferLimitExceeded) {
1✔
920
        commitAndRun(substream);
1✔
921
        if (state.winningSubstream == substream) {
1✔
922
          safeCloseMasterListener(status, rpcProgress, trailers);
1✔
923
        }
924
        return;
1✔
925
      }
926
      if (rpcProgress == RpcProgress.MISCARRIED
1✔
927
          && localOnlyTransparentRetries.incrementAndGet() > 1_000) {
1✔
928
        commitAndRun(substream);
1✔
929
        if (state.winningSubstream == substream) {
1✔
930
          Status tooManyTransparentRetries = Status.INTERNAL
1✔
931
              .withDescription("Too many transparent retries. Might be a bug in gRPC")
1✔
932
              .withCause(status.asRuntimeException());
1✔
933
          safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
1✔
934
        }
935
        return;
1✔
936
      }
937

938
      if (state.winningSubstream == null) {
1✔
939
        if (rpcProgress == RpcProgress.MISCARRIED
1✔
940
            || (rpcProgress == RpcProgress.REFUSED
941
                && noMoreTransparentRetry.compareAndSet(false, true))) {
1✔
942
          // transparent retry
943
          final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
1✔
944
          if (newSubstream == null) {
1✔
945
            return;
×
946
          }
947
          if (isHedging) {
1✔
948
            synchronized (lock) {
1✔
949
              // Although this operation is not done atomically with
950
              // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of
951
              // activeHedges, so neither does it affect the commitment decision of other threads,
952
              // nor do the commitment decision making threads affect itself.
953
              state = state.replaceActiveHedge(substream, newSubstream);
1✔
954
            }
1✔
955
          }
956

957
          callExecutor.execute(new Runnable() {
1✔
958
            @Override
959
            public void run() {
960
              drain(newSubstream);
1✔
961
            }
1✔
962
          });
963
          return;
1✔
964
        } else if (rpcProgress == RpcProgress.DROPPED) {
1✔
965
          // For normal retry, nothing need be done here, will just commit.
966
          // For hedging, cancel scheduled hedge that is scheduled prior to the drop
967
          if (isHedging) {
1✔
968
            freezeHedging();
×
969
          }
970
        } else {
971
          noMoreTransparentRetry.set(true);
1✔
972

973
          if (isHedging) {
1✔
974
            HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
1✔
975
            if (hedgingPlan.isHedgeable) {
1✔
976
              pushbackHedging(hedgingPlan.hedgingPushbackMillis);
1✔
977
            }
978
            synchronized (lock) {
1✔
979
              state = state.removeActiveHedge(substream);
1✔
980
              // The invariant is whether or not #(Potential Hedge + active hedges) > 0.
981
              // Once hasPotentialHedging(state) is false, it will always be false, and then
982
              // #(state.activeHedges) will be decreasing. This guarantees that even there may be
983
              // multiple concurrent hedges, one of the hedges will end up committed.
984
              if (hedgingPlan.isHedgeable) {
1✔
985
                if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
1✔
986
                  return;
1✔
987
                }
988
                // else, no activeHedges, no new hedges possible, try to commit
989
              } // else, isHedgeable is false, try to commit
990
            }
1✔
991
          } else {
1✔
992
            RetryPlan retryPlan = makeRetryDecision(status, trailers);
1✔
993
            if (retryPlan.shouldRetry) {
1✔
994
              // retry
995
              Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false);
1✔
996
              if (newSubstream == null) {
1✔
997
                return;
×
998
              }
999
              // The check state.winningSubstream == null, checking if is not already committed, is
1000
              // racy, but is still safe b/c the retry will also handle committed/cancellation
1001
              FutureCanceller scheduledRetryCopy;
1002
              synchronized (lock) {
1✔
1003
                scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
1✔
1004
              }
1✔
1005
              class RetryBackoffRunnable implements Runnable {
1✔
1006
                @Override
1007
                public void run() {
1008
                  callExecutor.execute(
1✔
1009
                      new Runnable() {
1✔
1010
                        @Override
1011
                        public void run() {
1012
                          drain(newSubstream);
1✔
1013
                        }
1✔
1014
                      });
1015
                }
1✔
1016
              }
1017

1018
              scheduledRetryCopy.setFuture(
1✔
1019
                  scheduledExecutorService.schedule(
1✔
1020
                      new RetryBackoffRunnable(),
1021
                      retryPlan.backoffNanos,
1022
                      TimeUnit.NANOSECONDS));
1023
              return;
1✔
1024
            }
1025
          }
1026
        }
1027
      }
1028

1029
      commitAndRun(substream);
1✔
1030
      if (state.winningSubstream == substream) {
1✔
1031
        safeCloseMasterListener(status, rpcProgress, trailers);
1✔
1032
      }
1033
    }
1✔
1034

1035
    /**
1036
     * Decides in current situation whether or not the RPC should retry and if it should retry how
1037
     * long the backoff should be. The decision does not take the commitment status into account, so
1038
     * caller should check it separately. It also updates the throttle. It does not change state.
1039
     */
1040
    private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
1041
      if (retryPolicy == null) {
1✔
1042
        return new RetryPlan(false, 0);
1✔
1043
      }
1044
      boolean shouldRetry = false;
1✔
1045
      long backoffNanos = 0L;
1✔
1046
      boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
1✔
1047
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1048
      boolean isThrottled = false;
1✔
1049
      if (throttle != null) {
1✔
1050
        if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1051
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1052
        }
1053
      }
1054

1055
      if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
1✔
1056
        if (pushbackMillis == null) {
1✔
1057
          if (isRetryableStatusCode) {
1✔
1058
            shouldRetry = true;
1✔
1059
            backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble());
1✔
1060
            nextBackoffIntervalNanos = Math.min(
1✔
1061
                (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
1✔
1062
                retryPolicy.maxBackoffNanos);
1✔
1063
          } // else no retry
1064
        } else if (pushbackMillis >= 0) {
1✔
1065
          shouldRetry = true;
1✔
1066
          backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
1✔
1067
          nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
1068
        } // else no retry
1069
      } // else no retry
1070

1071
      return new RetryPlan(shouldRetry, backoffNanos);
1✔
1072
    }
1073

1074
    private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
1075
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1076
      boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
1✔
1077
      boolean isThrottled = false;
1✔
1078
      if (throttle != null) {
1✔
1079
        if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1080
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1081
        }
1082
      }
1083
      if (!isFatal && !isThrottled && !status.isOk()
1✔
1084
          && (pushbackMillis != null && pushbackMillis > 0)) {
1✔
1085
        pushbackMillis = 0; // We want the retry after a nonfatal error to be immediate
1✔
1086
      }
1087
      return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
1✔
1088
    }
1089

1090
    @Nullable
1091
    private Integer getPushbackMills(Metadata trailer) {
1092
      String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
1✔
1093
      Integer pushbackMillis = null;
1✔
1094
      if (pushbackStr != null) {
1✔
1095
        try {
1096
          pushbackMillis = Integer.valueOf(pushbackStr);
1✔
1097
        } catch (NumberFormatException e) {
1✔
1098
          pushbackMillis = -1;
1✔
1099
        }
1✔
1100
      }
1101
      return pushbackMillis;
1✔
1102
    }
1103

1104
    @Override
1105
    public void messagesAvailable(final MessageProducer producer) {
1106
      State savedState = state;
1✔
1107
      checkState(
1✔
1108
          savedState.winningSubstream != null, "Headers should be received prior to messages.");
1109
      if (savedState.winningSubstream != substream) {
1✔
1110
        GrpcUtil.closeQuietly(producer);
1✔
1111
        return;
1✔
1112
      }
1113
      listenerSerializeExecutor.execute(
1✔
1114
          new Runnable() {
1✔
1115
            @Override
1116
            public void run() {
1117
              masterListener.messagesAvailable(producer);
1✔
1118
            }
1✔
1119
          });
1120
    }
1✔
1121

1122
    @Override
1123
    public void onReady() {
1124
      // FIXME(#7089): hedging case is broken.
1125
      if (!isReady()) {
1✔
1126
        return;
1✔
1127
      }
1128
      listenerSerializeExecutor.execute(
1✔
1129
          new Runnable() {
1✔
1130
            @Override
1131
            public void run() {
1132
              if (!isClosed) {
1✔
1133
                masterListener.onReady();
1✔
1134
              }
1135
            }
1✔
1136
          });
1137
    }
1✔
1138
  }
1139

1140
  private static final class State {
1141
    /** Committed and the winning substream drained. */
1142
    final boolean passThrough;
1143

1144
    /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
1145
    @Nullable final List<BufferEntry> buffer;
1146

1147
    /**
1148
     * Unmodifiable collection of all the open substreams that are drained. Singleton once
1149
     * passThrough; Empty if committed but not passTrough.
1150
     */
1151
    final Collection<Substream> drainedSubstreams;
1152

1153
    /**
1154
     * Unmodifiable collection of all the active hedging substreams.
1155
     *
1156
     * <p>A substream even with the attribute substream.closed being true may be considered still
1157
     * "active" at the moment as long as it is in this collection.
1158
     */
1159
    final Collection<Substream> activeHedges; // not null once isHedging = true
1160

1161
    final int hedgingAttemptCount;
1162

1163
    /** Null until committed. */
1164
    @Nullable final Substream winningSubstream;
1165

1166
    /** Not required to set to true when cancelled, but can short-circuit the draining process. */
1167
    final boolean cancelled;
1168

1169
    /** No more hedging due to events like drop or pushback. */
1170
    final boolean hedgingFrozen;
1171

1172
    State(
1173
        @Nullable List<BufferEntry> buffer,
1174
        Collection<Substream> drainedSubstreams,
1175
        Collection<Substream> activeHedges,
1176
        @Nullable Substream winningSubstream,
1177
        boolean cancelled,
1178
        boolean passThrough,
1179
        boolean hedgingFrozen,
1180
        int hedgingAttemptCount) {
1✔
1181
      this.buffer = buffer;
1✔
1182
      this.drainedSubstreams =
1✔
1183
          checkNotNull(drainedSubstreams, "drainedSubstreams");
1✔
1184
      this.winningSubstream = winningSubstream;
1✔
1185
      this.activeHedges = activeHedges;
1✔
1186
      this.cancelled = cancelled;
1✔
1187
      this.passThrough = passThrough;
1✔
1188
      this.hedgingFrozen = hedgingFrozen;
1✔
1189
      this.hedgingAttemptCount = hedgingAttemptCount;
1✔
1190

1191
      checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
1✔
1192
      checkState(
1✔
1193
          !passThrough || winningSubstream != null,
1194
          "passThrough should imply winningSubstream != null");
1195
      checkState(
1✔
1196
          !passThrough
1197
              || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
1✔
1198
              || (drainedSubstreams.size() == 0 && winningSubstream.closed),
1✔
1199
          "passThrough should imply winningSubstream is drained");
1200
      checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
1✔
1201
    }
1✔
1202

1203
    @CheckReturnValue
1204
    // GuardedBy RetriableStream.lock
1205
    State cancelled() {
1206
      return new State(
1✔
1207
          buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough,
1208
          hedgingFrozen, hedgingAttemptCount);
1209
    }
1210

1211
    /** The given substream is drained. */
1212
    @CheckReturnValue
1213
    // GuardedBy RetriableStream.lock
1214
    State substreamDrained(Substream substream) {
1215
      checkState(!passThrough, "Already passThrough");
1✔
1216

1217
      Collection<Substream> drainedSubstreams;
1218
      
1219
      if (substream.closed) {
1✔
1220
        drainedSubstreams = this.drainedSubstreams;
1✔
1221
      } else if (this.drainedSubstreams.isEmpty()) {
1✔
1222
        // optimize for 0-retry, which is most of the cases.
1223
        drainedSubstreams = Collections.singletonList(substream);
1✔
1224
      } else {
1225
        drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1226
        drainedSubstreams.add(substream);
1✔
1227
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1228
      }
1229

1230
      boolean passThrough = winningSubstream != null;
1✔
1231

1232
      List<BufferEntry> buffer = this.buffer;
1✔
1233
      if (passThrough) {
1✔
1234
        checkState(
1✔
1235
            winningSubstream == substream, "Another RPC attempt has already committed");
1236
        buffer = null;
1✔
1237
      }
1238

1239
      return new State(
1✔
1240
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1241
          hedgingFrozen, hedgingAttemptCount);
1242
    }
1243

1244
    /** The given substream is closed. */
1245
    @CheckReturnValue
1246
    // GuardedBy RetriableStream.lock
1247
    State substreamClosed(Substream substream) {
1248
      substream.closed = true;
1✔
1249
      if (this.drainedSubstreams.contains(substream)) {
1✔
1250
        Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1251
        drainedSubstreams.remove(substream);
1✔
1252
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1253
        return new State(
1✔
1254
            buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1255
            hedgingFrozen, hedgingAttemptCount);
1256
      } else {
1257
        return this;
1✔
1258
      }
1259
    }
1260

1261
    @CheckReturnValue
1262
    // GuardedBy RetriableStream.lock
1263
    State committed(Substream winningSubstream) {
1264
      checkState(this.winningSubstream == null, "Already committed");
1✔
1265

1266
      boolean passThrough = false;
1✔
1267
      List<BufferEntry> buffer = this.buffer;
1✔
1268
      Collection<Substream> drainedSubstreams;
1269

1270
      if (this.drainedSubstreams.contains(winningSubstream)) {
1✔
1271
        passThrough = true;
1✔
1272
        buffer = null;
1✔
1273
        drainedSubstreams = Collections.singleton(winningSubstream);
1✔
1274
      } else {
1275
        drainedSubstreams = Collections.emptyList();
1✔
1276
      }
1277

1278
      return new State(
1✔
1279
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1280
          hedgingFrozen, hedgingAttemptCount);
1281
    }
1282

1283
    @CheckReturnValue
1284
    // GuardedBy RetriableStream.lock
1285
    State freezeHedging() {
1286
      if (hedgingFrozen) {
1✔
1287
        return this;
×
1288
      }
1289
      return new State(
1✔
1290
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1291
          true, hedgingAttemptCount);
1292
    }
1293

1294
    @CheckReturnValue
1295
    // GuardedBy RetriableStream.lock
1296
    // state.hedgingAttemptCount is modified only here.
1297
    // The method is only called in RetriableStream.start() and HedgingRunnable.run()
1298
    State addActiveHedge(Substream substream) {
1299
      // hasPotentialHedging must be true
1300
      checkState(!hedgingFrozen, "hedging frozen");
1✔
1301
      checkState(winningSubstream == null, "already committed");
1✔
1302

1303
      Collection<Substream> activeHedges;
1304
      if (this.activeHedges == null) {
1✔
1305
        activeHedges = Collections.singleton(substream);
1✔
1306
      } else {
1307
        activeHedges = new ArrayList<>(this.activeHedges);
1✔
1308
        activeHedges.add(substream);
1✔
1309
        activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1310
      }
1311

1312
      int hedgingAttemptCount = this.hedgingAttemptCount + 1;
1✔
1313
      return new State(
1✔
1314
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1315
          hedgingFrozen, hedgingAttemptCount);
1316
    }
1317

1318
    @CheckReturnValue
1319
    // GuardedBy RetriableStream.lock
1320
    // The method is only called in Sublistener.closed()
1321
    State removeActiveHedge(Substream substream) {
1322
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1323
      activeHedges.remove(substream);
1✔
1324
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1325

1326
      return new State(
1✔
1327
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1328
          hedgingFrozen, hedgingAttemptCount);
1329
    }
1330

1331
    @CheckReturnValue
1332
    // GuardedBy RetriableStream.lock
1333
    // The method is only called for transparent retry.
1334
    State replaceActiveHedge(Substream oldOne, Substream newOne) {
1335
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1336
      activeHedges.remove(oldOne);
1✔
1337
      activeHedges.add(newOne);
1✔
1338
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1339

1340
      return new State(
1✔
1341
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1342
          hedgingFrozen, hedgingAttemptCount);
1343
    }
1344
  }
1345

1346
  /**
1347
   * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
1348
   *  attributes.
1349
   */
1350
  private static final class Substream {
1351
    ClientStream stream;
1352

1353
    // GuardedBy RetriableStream.lock
1354
    boolean closed;
1355

1356
    // setting to true must be GuardedBy RetriableStream.lock
1357
    boolean bufferLimitExceeded;
1358

1359
    final int previousAttemptCount;
1360

1361
    Substream(int previousAttemptCount) {
1✔
1362
      this.previousAttemptCount = previousAttemptCount;
1✔
1363
    }
1✔
1364
  }
1365

1366

1367
  /**
1368
   * Traces the buffer used by a substream.
1369
   */
1370
  class BufferSizeTracer extends ClientStreamTracer {
1371
    // Each buffer size tracer is dedicated to one specific substream.
1372
    private final Substream substream;
1373

1374
    @GuardedBy("lock")
1375
    long bufferNeeded;
1376

1377
    BufferSizeTracer(Substream substream) {
1✔
1378
      this.substream = substream;
1✔
1379
    }
1✔
1380

1381
    /**
1382
     * A message is sent to the wire, so its reference would be released if no retry or
1383
     * hedging were involved. So at this point we have to hold the reference of the message longer
1384
     * for retry, and we need to increment {@code substream.bufferNeeded}.
1385
     */
1386
    @Override
1387
    public void outboundWireSize(long bytes) {
1388
      if (state.winningSubstream != null) {
1✔
1389
        return;
1✔
1390
      }
1391

1392
      Runnable postCommitTask = null;
1✔
1393

1394
      // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
1395
      synchronized (lock) {
1✔
1396
        if (state.winningSubstream != null || substream.closed) {
1✔
1397
          return;
×
1398
        }
1399
        bufferNeeded += bytes;
1✔
1400
        if (bufferNeeded <= perRpcBufferUsed) {
1✔
1401
          return;
1✔
1402
        }
1403

1404
        if (bufferNeeded > perRpcBufferLimit) {
1✔
1405
          substream.bufferLimitExceeded = true;
1✔
1406
        } else {
1407
          // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
1408
          long savedChannelBufferUsed =
1✔
1409
              channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
1✔
1410
          perRpcBufferUsed = bufferNeeded;
1✔
1411

1412
          if (savedChannelBufferUsed > channelBufferLimit) {
1✔
1413
            substream.bufferLimitExceeded = true;
1✔
1414
          }
1415
        }
1416

1417
        if (substream.bufferLimitExceeded) {
1✔
1418
          postCommitTask = commit(substream);
1✔
1419
        }
1420
      }
1✔
1421

1422
      if (postCommitTask != null) {
1✔
1423
        postCommitTask.run();
1✔
1424
      }
1425
    }
1✔
1426
  }
1427

1428
  /**
1429
   *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
1430
   *  the Channel. There should be a single instance of it for each channel.
1431
   */
1432
  static final class ChannelBufferMeter {
1✔
1433
    private final AtomicLong bufferUsed = new AtomicLong();
1✔
1434

1435
    @VisibleForTesting
1436
    long addAndGet(long newBytesUsed) {
1437
      return bufferUsed.addAndGet(newBytesUsed);
1✔
1438
    }
1439
  }
1440

1441
  /**
1442
   * Used for retry throttling.
1443
   */
1444
  static final class Throttle {
1445

1446
    private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
1447

1448
    /**
1449
     * 1000 times the maxTokens field of the retryThrottling policy in service config.
1450
     * The number of tokens starts at maxTokens. The token_count will always be between 0 and
1451
     * maxTokens.
1452
     */
1453
    final int maxTokens;
1454

1455
    /**
1456
     * Half of {@code maxTokens}.
1457
     */
1458
    final int threshold;
1459

1460
    /**
1461
     * 1000 times the tokenRatio field of the retryThrottling policy in service config.
1462
     */
1463
    final int tokenRatio;
1464

1465
    final AtomicInteger tokenCount = new AtomicInteger();
1✔
1466

1467
    Throttle(float maxTokens, float tokenRatio) {
1✔
1468
      // tokenRatio is up to 3 decimal places
1469
      this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1470
      this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1471
      this.threshold = this.maxTokens / 2;
1✔
1472
      tokenCount.set(this.maxTokens);
1✔
1473
    }
1✔
1474

1475
    @VisibleForTesting
1476
    boolean isAboveThreshold() {
1477
      return tokenCount.get() > threshold;
1✔
1478
    }
1479

1480
    /**
1481
     * Counts down the token on qualified failure and checks if it is above the threshold
1482
     * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
1483
     * a not-to-retry pushback.
1484
     */
1485
    @VisibleForTesting
1486
    boolean onQualifiedFailureThenCheckIsAboveThreshold() {
1487
      while (true) {
1488
        int currentCount = tokenCount.get();
1✔
1489
        if (currentCount == 0) {
1✔
1490
          return false;
1✔
1491
        }
1492
        int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1493
        boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
1✔
1494
        if (updated) {
1✔
1495
          return decremented > threshold;
1✔
1496
        }
1497
      }
×
1498
    }
1499

1500
    @VisibleForTesting
1501
    void onSuccess() {
1502
      while (true) {
1503
        int currentCount = tokenCount.get();
1✔
1504
        if (currentCount == maxTokens) {
1✔
1505
          break;
1✔
1506
        }
1507
        int incremented = currentCount + tokenRatio;
1✔
1508
        boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
1✔
1509
        if (updated) {
1✔
1510
          break;
1✔
1511
        }
1512
      }
×
1513
    }
1✔
1514

1515
    @Override
1516
    public boolean equals(Object o) {
1517
      if (this == o) {
1✔
1518
        return true;
×
1519
      }
1520
      if (!(o instanceof Throttle)) {
1✔
1521
        return false;
×
1522
      }
1523
      Throttle that = (Throttle) o;
1✔
1524
      return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
1✔
1525
    }
1526

1527
    @Override
1528
    public int hashCode() {
1529
      return Objects.hashCode(maxTokens, tokenRatio);
×
1530
    }
1531
  }
1532

1533
  private static final class RetryPlan {
1534
    final boolean shouldRetry;
1535
    final long backoffNanos;
1536

1537
    RetryPlan(boolean shouldRetry, long backoffNanos) {
1✔
1538
      this.shouldRetry = shouldRetry;
1✔
1539
      this.backoffNanos = backoffNanos;
1✔
1540
    }
1✔
1541
  }
1542

1543
  private static final class HedgingPlan {
1544
    final boolean isHedgeable;
1545
    @Nullable
1546
    final Integer hedgingPushbackMillis;
1547

1548
    public HedgingPlan(
1549
        boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
1✔
1550
      this.isHedgeable = isHedgeable;
1✔
1551
      this.hedgingPushbackMillis = hedgingPushbackMillis;
1✔
1552
    }
1✔
1553
  }
1554

1555
  /** Allows cancelling a Future without racing with setting the future. */
1556
  private static final class FutureCanceller {
1557

1558
    final Object lock;
1559
    @GuardedBy("lock")
1560
    Future<?> future;
1561
    @GuardedBy("lock")
1562
    boolean cancelled;
1563

1564
    FutureCanceller(Object lock) {
1✔
1565
      this.lock = lock;
1✔
1566
    }
1✔
1567

1568
    void setFuture(Future<?> future) {
1569
      synchronized (lock) {
1✔
1570
        if (!cancelled) {
1✔
1571
          this.future = future;
1✔
1572
        }
1573
      }
1✔
1574
    }
1✔
1575

1576
    @GuardedBy("lock")
1577
    @CheckForNull // Must cancel the returned future if not null.
1578
    Future<?> markCancelled() {
1579
      cancelled = true;
1✔
1580
      return future;
1✔
1581
    }
1582

1583
    @GuardedBy("lock")
1584
    boolean isCancelled() {
1585
      return cancelled;
1✔
1586
    }
1587
  }
1588
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc