• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19674

31 Jan 2025 12:10AM CUT coverage: 88.56% (-0.03%) from 88.586%
#19674

push

github

ejona86
alts: Add ClientCall support to AltsContextUtil

This adds a createFrom(Attributes) to mirror the check(Attributes) added
in ba8ab79. It also adds conveniences for ClientCall for both
createFrom() and check(). This allows getting peer information from
ClientCall and CallCredentials.RequestInfo, as was already available
from ServerCall.

The tests were reworked to test the Attribute-based methods and then
only basic tests for client/server.

Fixes #11042

33752 of 38112 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.03
/../core/src/main/java/io/grpc/internal/RetriableStream.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Objects;
25
import com.google.errorprone.annotations.CheckReturnValue;
26
import com.google.errorprone.annotations.concurrent.GuardedBy;
27
import io.grpc.Attributes;
28
import io.grpc.ClientStreamTracer;
29
import io.grpc.Compressor;
30
import io.grpc.Deadline;
31
import io.grpc.DecompressorRegistry;
32
import io.grpc.Metadata;
33
import io.grpc.MethodDescriptor;
34
import io.grpc.Status;
35
import io.grpc.SynchronizationContext;
36
import io.grpc.internal.ClientStreamListener.RpcProgress;
37
import java.io.InputStream;
38
import java.lang.Thread.UncaughtExceptionHandler;
39
import java.util.ArrayList;
40
import java.util.Collection;
41
import java.util.Collections;
42
import java.util.List;
43
import java.util.Random;
44
import java.util.concurrent.Executor;
45
import java.util.concurrent.Future;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import java.util.concurrent.atomic.AtomicBoolean;
49
import java.util.concurrent.atomic.AtomicInteger;
50
import java.util.concurrent.atomic.AtomicLong;
51
import javax.annotation.CheckForNull;
52
import javax.annotation.Nullable;
53

54
/** A logical {@link ClientStream} that is retriable. */
55
abstract class RetriableStream<ReqT> implements ClientStream {
56
  @VisibleForTesting
57
  static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
1✔
58
      Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
1✔
59

60
  @VisibleForTesting
61
  static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
1✔
62
      Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
1✔
63

64
  private static final Status CANCELLED_BECAUSE_COMMITTED =
1✔
65
      Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
1✔
66

67
  private final MethodDescriptor<ReqT, ?> method;
68
  private final Executor callExecutor;
69
  private final Executor listenerSerializeExecutor = new SynchronizationContext(
1✔
70
      new UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          throw Status.fromThrowable(e)
×
74
              .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
×
75
              .asRuntimeException();
×
76
        }
77
      }
78
  );
79
  private final ScheduledExecutorService scheduledExecutorService;
80
  // Must not modify it.
81
  private final Metadata headers;
82
  @Nullable
83
  private final RetryPolicy retryPolicy;
84
  @Nullable
85
  private final HedgingPolicy hedgingPolicy;
86
  private final boolean isHedging;
87

88
  /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
89
  private final Object lock = new Object();
1✔
90

91
  private final ChannelBufferMeter channelBufferUsed;
92
  private final long perRpcBufferLimit;
93
  private final long channelBufferLimit;
94
  @Nullable
95
  private final Throttle throttle;
96
  @GuardedBy("lock")
1✔
97
  private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
98

99
  private volatile State state = new State(
1✔
100
      new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
1✔
101
      false, 0);
102

103
  /**
104
   * Either non-local transparent retry happened or reached server's application logic.
105
   *
106
   * <p>Note that local-only transparent retries are unlimited.
107
   */
108
  private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
1✔
109
  private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
1✔
110
  private final AtomicInteger inFlightSubStreams = new AtomicInteger();
1✔
111
  private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112

113
  // Used for recording the share of buffer used for the current call out of the channel buffer.
114
  // This field would not be necessary if there is no channel buffer limit.
115
  @GuardedBy("lock")
116
  private long perRpcBufferUsed;
117

118
  private ClientStreamListener masterListener;
119
  @GuardedBy("lock")
120
  private FutureCanceller scheduledRetry;
121
  @GuardedBy("lock")
122
  private FutureCanceller scheduledHedging;
123
  private long nextBackoffIntervalNanos;
124
  private Status cancellationStatus;
125
  private boolean isClosed;
126

127
  RetriableStream(
128
      MethodDescriptor<ReqT, ?> method, Metadata headers,
129
      ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
130
      Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
131
      @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy,
132
      @Nullable Throttle throttle) {
1✔
133
    this.method = method;
1✔
134
    this.channelBufferUsed = channelBufferUsed;
1✔
135
    this.perRpcBufferLimit = perRpcBufferLimit;
1✔
136
    this.channelBufferLimit = channelBufferLimit;
1✔
137
    this.callExecutor = callExecutor;
1✔
138
    this.scheduledExecutorService = scheduledExecutorService;
1✔
139
    this.headers = headers;
1✔
140
    this.retryPolicy = retryPolicy;
1✔
141
    if (retryPolicy != null) {
1✔
142
      this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
143
    }
144
    this.hedgingPolicy = hedgingPolicy;
1✔
145
    checkArgument(
1✔
146
        retryPolicy == null || hedgingPolicy == null,
147
        "Should not provide both retryPolicy and hedgingPolicy");
148
    this.isHedging = hedgingPolicy != null;
1✔
149
    this.throttle = throttle;
1✔
150
  }
1✔
151

152
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153
  @Nullable // null if already committed
154
  @CheckReturnValue
155
  private Runnable commit(final Substream winningSubstream) {
156
    synchronized (lock) {
1✔
157
      if (state.winningSubstream != null) {
1✔
158
        return null;
1✔
159
      }
160
      final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
1✔
161

162
      state = state.committed(winningSubstream);
1✔
163

164
      // subtract the share of this RPC from channelBufferUsed.
165
      channelBufferUsed.addAndGet(-perRpcBufferUsed);
1✔
166

167
      final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
1✔
168
      final Future<?> retryFuture;
169
      if (scheduledRetry != null) {
1✔
170
        retryFuture = scheduledRetry.markCancelled();
1✔
171
        scheduledRetry = null;
1✔
172
      } else {
173
        retryFuture = null;
1✔
174
      }
175
      // cancel the scheduled hedging if it is scheduled prior to the commitment
176
      final Future<?> hedgingFuture;
177
      if (scheduledHedging != null) {
1✔
178
        hedgingFuture = scheduledHedging.markCancelled();
1✔
179
        scheduledHedging = null;
1✔
180
      } else {
181
        hedgingFuture = null;
1✔
182
      }
183

184
      class CommitTask implements Runnable {
1✔
185
        @Override
186
        public void run() {
187
          // For hedging only, not needed for normal retry
188
          for (Substream substream : savedDrainedSubstreams) {
1✔
189
            if (substream != winningSubstream) {
1✔
190
              substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
1✔
191
            }
192
          }
1✔
193
          if (retryFuture != null) {
1✔
194
            retryFuture.cancel(false);
1✔
195
            if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
196
              assert savedCloseMasterListenerReason != null;
×
197
              listenerSerializeExecutor.execute(
×
198
                  new Runnable() {
×
199
                    @Override
200
                    public void run() {
201
                      isClosed = true;
×
202
                      masterListener.closed(savedCloseMasterListenerReason.status,
×
203
                          savedCloseMasterListenerReason.progress,
×
204
                          savedCloseMasterListenerReason.metadata);
×
205
                    }
×
206
                  });
207
            }
208
          }
209

210
          if (hedgingFuture != null) {
1✔
211
            hedgingFuture.cancel(false);
1✔
212
          }
213

214
          postCommit();
1✔
215
        }
1✔
216
      }
217

218
      return new CommitTask();
1✔
219
    }
220
  }
221

222
  abstract void postCommit();
223

224
  /**
225
   * Calls commit() and if successful runs the post commit task. Post commit task will be non-null
226
   * for only once. The post commit task cancels other non-winning streams on separate transport
227
   * threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream
228
   * transports.(issues/10314)
229
   * This method should be called only in subListener callbacks. This guarantees callExecutor
230
   * schedules tasks before master listener closes, which is protected by the inFlightSubStreams
231
   * decorative. That is because:
232
   * For a successful winning stream, other streams won't attempt to close master listener.
233
   * For a cancelled winning stream (noop), other stream won't attempt to close master listener.
234
   * For a failed/closed winning stream, the last closed stream closes the master listener, and
235
   * callExecutor scheduling happens-before that.
236
   */
237
  private void commitAndRun(Substream winningSubstream) {
238
    Runnable postCommitTask = commit(winningSubstream);
1✔
239

240
    if (postCommitTask != null) {
1✔
241
      callExecutor.execute(postCommitTask);
1✔
242
    }
243
  }
1✔
244

245
  // returns null means we should not create new sub streams, e.g. cancelled or
246
  // other close condition is met for retriableStream.
247
  @Nullable
248
  private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
249
    int inFlight;
250
    do {
251
      inFlight = inFlightSubStreams.get();
1✔
252
      if (inFlight < 0) {
1✔
253
        return null;
×
254
      }
255
    } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1));
1✔
256
    Substream sub = new Substream(previousAttemptCount);
1✔
257
    // one tracer per substream
258
    final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
1✔
259
    ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
1✔
260
      @Override
261
      public ClientStreamTracer newClientStreamTracer(
262
          ClientStreamTracer.StreamInfo info, Metadata headers) {
263
        return bufferSizeTracer;
1✔
264
      }
265
    };
266

267
    Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
1✔
268
    // NOTICE: This set _must_ be done before stream.start() and it actually is.
269
    sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry);
1✔
270
    return sub;
1✔
271
  }
272

273
  /**
274
   * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
275
   * Client stream is not yet started.
276
   */
277
  abstract ClientStream newSubstream(
278
      Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
279
      boolean isTransparentRetry);
280

281
  /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
282
  @VisibleForTesting
283
  final Metadata updateHeaders(
284
      Metadata originalHeaders, int previousAttemptCount) {
285
    Metadata newHeaders = new Metadata();
1✔
286
    newHeaders.merge(originalHeaders);
1✔
287
    if (previousAttemptCount > 0) {
1✔
288
      newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount));
1✔
289
    }
290
    return newHeaders;
1✔
291
  }
292

293
  private void drain(Substream substream) {
294
    int index = 0;
1✔
295
    int chunk = 0x80;
1✔
296
    List<BufferEntry> list = null;
1✔
297
    boolean streamStarted = false;
1✔
298
    Runnable onReadyRunnable = null;
1✔
299

300
    while (true) {
301
      State savedState;
302

303
      synchronized (lock) {
1✔
304
        savedState = state;
1✔
305
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
306
          // committed but not me, to be cancelled
307
          break;
1✔
308
        }
309
        if (savedState.cancelled) {
1✔
310
          break;
1✔
311
        }
312
        if (index == savedState.buffer.size()) { // I'm drained
1✔
313
          state = savedState.substreamDrained(substream);
1✔
314
          if (!isReady()) {
1✔
315
            return;
1✔
316
          }
317
          onReadyRunnable = new Runnable() {
1✔
318
            @Override
319
            public void run() {
320
              if (!isClosed) {
1✔
321
                masterListener.onReady();
1✔
322
              }
323
            }
1✔
324
          };
325
          break;
1✔
326
        }
327

328
        if (substream.closed) {
1✔
329
          return;
×
330
        }
331

332
        int stop = Math.min(index + chunk, savedState.buffer.size());
1✔
333
        if (list == null) {
1✔
334
          list = new ArrayList<>(savedState.buffer.subList(index, stop));
1✔
335
        } else {
336
          list.clear();
1✔
337
          list.addAll(savedState.buffer.subList(index, stop));
1✔
338
        }
339
        index = stop;
1✔
340
      }
1✔
341

342
      for (BufferEntry bufferEntry : list) {
1✔
343
        bufferEntry.runWith(substream);
1✔
344
        if (bufferEntry instanceof RetriableStream.StartEntry) {
1✔
345
          streamStarted = true;
1✔
346
        }
347
        savedState = state;
1✔
348
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
349
          // committed but not me, to be cancelled
350
          break;
1✔
351
        }
352
        if (savedState.cancelled) {
1✔
353
          break;
1✔
354
        }
355
      }
1✔
356
    }
1✔
357

358
    if (onReadyRunnable != null) {
1✔
359
      listenerSerializeExecutor.execute(onReadyRunnable);
1✔
360
      return;
1✔
361
    }
362

363
    if (!streamStarted) {
1✔
364
      // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
365
      substream.stream.start(new Sublistener(substream));
1✔
366
    }
367
    substream.stream.cancel(
1✔
368
        state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
1✔
369
  }
1✔
370

371
  /**
372
   * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
373
   */
374
  @CheckReturnValue
375
  @Nullable
376
  abstract Status prestart();
377

378
  class StartEntry implements BufferEntry {
1✔
379
    @Override
380
    public void runWith(Substream substream) {
381
      substream.stream.start(new Sublistener(substream));
1✔
382
    }
1✔
383
  }
384

385
  /** Starts the first PRC attempt. */
386
  @Override
387
  public final void start(ClientStreamListener listener) {
388
    masterListener = listener;
1✔
389

390
    Status shutdownStatus = prestart();
1✔
391

392
    if (shutdownStatus != null) {
1✔
393
      cancel(shutdownStatus);
1✔
394
      return;
1✔
395
    }
396

397
    synchronized (lock) {
1✔
398
      state.buffer.add(new StartEntry());
1✔
399
    }
1✔
400

401
    Substream substream = createSubstream(0, false);
1✔
402
    if (substream == null) {
1✔
403
      return;
×
404
    }
405
    if (isHedging) {
1✔
406
      FutureCanceller scheduledHedgingRef = null;
1✔
407

408
      synchronized (lock) {
1✔
409
        state = state.addActiveHedge(substream);
1✔
410
        if (hasPotentialHedging(state)
1✔
411
            && (throttle == null || throttle.isAboveThreshold())) {
1✔
412
          scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
1✔
413
        }
414
      }
1✔
415

416
      if (scheduledHedgingRef != null) {
1✔
417
        scheduledHedgingRef.setFuture(
1✔
418
            scheduledExecutorService.schedule(
1✔
419
                new HedgingRunnable(scheduledHedgingRef),
420
                hedgingPolicy.hedgingDelayNanos,
421
                TimeUnit.NANOSECONDS));
422
      }
423
    }
424

425
    drain(substream);
1✔
426
  }
1✔
427

428
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
429
  private void pushbackHedging(@Nullable Integer delayMillis) {
430
    if (delayMillis == null) {
1✔
431
      return;
1✔
432
    }
433
    if (delayMillis < 0) {
1✔
434
      freezeHedging();
1✔
435
      return;
1✔
436
    }
437

438
    // Cancels the current scheduledHedging and reschedules a new one.
439
    FutureCanceller future;
440
    Future<?> futureToBeCancelled;
441

442
    synchronized (lock) {
1✔
443
      if (scheduledHedging == null) {
1✔
444
        return;
×
445
      }
446

447
      futureToBeCancelled = scheduledHedging.markCancelled();
1✔
448
      scheduledHedging = future = new FutureCanceller(lock);
1✔
449
    }
1✔
450

451
    if (futureToBeCancelled != null) {
1✔
452
      futureToBeCancelled.cancel(false);
1✔
453
    }
454
    future.setFuture(scheduledExecutorService.schedule(
1✔
455
        new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS));
1✔
456
  }
1✔
457

458
  private final class HedgingRunnable implements Runnable {
459

460
    // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
461
    // by a positive push-back just after newSubstream is instantiated, so that we can double check.
462
    final FutureCanceller scheduledHedgingRef;
463

464
    HedgingRunnable(FutureCanceller scheduledHedging) {
1✔
465
      scheduledHedgingRef = scheduledHedging;
1✔
466
    }
1✔
467

468
    @Override
469
    public void run() {
470
      // It's safe to read state.hedgingAttemptCount here.
471
      // If this run is not cancelled, the value of state.hedgingAttemptCount won't change
472
      // until state.addActiveHedge() is called subsequently, even the state could possibly
473
      // change.
474
      Substream newSubstream = createSubstream(state.hedgingAttemptCount, false);
1✔
475
      if (newSubstream == null) {
1✔
476
        return;
×
477
      }
478
      callExecutor.execute(
1✔
479
          new Runnable() {
1✔
480
            @SuppressWarnings("GuardedBy")  //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
481
            @Override
482
            public void run() {
483
              boolean cancelled = false;
1✔
484
              FutureCanceller future = null;
1✔
485

486
              synchronized (lock) {
1✔
487
                if (scheduledHedgingRef.isCancelled()) {
1✔
488
                  cancelled = true;
×
489
                } else {
490
                  state = state.addActiveHedge(newSubstream);
1✔
491
                  if (hasPotentialHedging(state)
1✔
492
                      && (throttle == null || throttle.isAboveThreshold())) {
1✔
493
                    scheduledHedging = future = new FutureCanceller(lock);
1✔
494
                  } else {
495
                    state = state.freezeHedging();
1✔
496
                    scheduledHedging = null;
1✔
497
                  }
498
                }
499
              }
1✔
500

501
              if (cancelled) {
1✔
502
                // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
503
                newSubstream.stream.start(new Sublistener(newSubstream));
×
504
                newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
×
505
                return;
×
506
              }
507
              if (future != null) {
1✔
508
                future.setFuture(
1✔
509
                    scheduledExecutorService.schedule(
1✔
510
                        new HedgingRunnable(future),
511
                        hedgingPolicy.hedgingDelayNanos,
1✔
512
                        TimeUnit.NANOSECONDS));
513
              }
514
              drain(newSubstream);
1✔
515
            }
1✔
516
          });
517
    }
1✔
518
  }
519

520
  @Override
521
  public final void cancel(final Status reason) {
522
    Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
1✔
523
    noopSubstream.stream = new NoopClientStream();
1✔
524
    Runnable runnable = commit(noopSubstream);
1✔
525

526
    if (runnable != null) {
1✔
527
      synchronized (lock) {
1✔
528
        state = state.substreamDrained(noopSubstream);
1✔
529
      }
1✔
530
      runnable.run();
1✔
531
      safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
1✔
532
      return;
1✔
533
    }
534

535
    Substream winningSubstreamToCancel = null;
1✔
536
    synchronized (lock) {
1✔
537
      if (state.drainedSubstreams.contains(state.winningSubstream)) {
1✔
538
        winningSubstreamToCancel = state.winningSubstream;
1✔
539
      } else { // the winningSubstream will be cancelled while draining
540
        cancellationStatus = reason;
1✔
541
      }
542
      state = state.cancelled();
1✔
543
    }
1✔
544
    if (winningSubstreamToCancel != null) {
1✔
545
      winningSubstreamToCancel.stream.cancel(reason);
1✔
546
    }
547
  }
1✔
548

549
  private void delayOrExecute(BufferEntry bufferEntry) {
550
    Collection<Substream> savedDrainedSubstreams;
551
    synchronized (lock) {
1✔
552
      if (!state.passThrough) {
1✔
553
        state.buffer.add(bufferEntry);
1✔
554
      }
555
      savedDrainedSubstreams = state.drainedSubstreams;
1✔
556
    }
1✔
557

558
    for (Substream substream : savedDrainedSubstreams) {
1✔
559
      bufferEntry.runWith(substream);
1✔
560
    }
1✔
561
  }
1✔
562

563
  /**
564
   * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use
565
   * InputStream for buffering.
566
   */
567
  @Override
568
  public final void writeMessage(InputStream message) {
569
    throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
×
570
  }
571

572
  final void sendMessage(final ReqT message) {
573
    State savedState = state;
1✔
574
    if (savedState.passThrough) {
1✔
575
      savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
1✔
576
      return;
1✔
577
    }
578

579
    class SendMessageEntry implements BufferEntry {
1✔
580
      @Override
581
      public void runWith(Substream substream) {
582
        substream.stream.writeMessage(method.streamRequest(message));
1✔
583
        // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
584
        // flushes (or half close), but retry appears to have a code path that the flushes may
585
        // not happen. The code needs to be fixed and this removed. See #9340.
586
        substream.stream.flush();
1✔
587
      }
1✔
588
    }
589

590
    delayOrExecute(new SendMessageEntry());
1✔
591
  }
1✔
592

593
  @Override
594
  public final void request(final int numMessages) {
595
    State savedState = state;
1✔
596
    if (savedState.passThrough) {
1✔
597
      savedState.winningSubstream.stream.request(numMessages);
1✔
598
      return;
1✔
599
    }
600

601
    class RequestEntry implements BufferEntry {
1✔
602
      @Override
603
      public void runWith(Substream substream) {
604
        substream.stream.request(numMessages);
1✔
605
      }
1✔
606
    }
607

608
    delayOrExecute(new RequestEntry());
1✔
609
  }
1✔
610

611
  @Override
612
  public final void flush() {
613
    State savedState = state;
1✔
614
    if (savedState.passThrough) {
1✔
615
      savedState.winningSubstream.stream.flush();
1✔
616
      return;
1✔
617
    }
618

619
    class FlushEntry implements BufferEntry {
1✔
620
      @Override
621
      public void runWith(Substream substream) {
622
        substream.stream.flush();
1✔
623
      }
1✔
624
    }
625

626
    delayOrExecute(new FlushEntry());
1✔
627
  }
1✔
628

629
  @Override
630
  public final boolean isReady() {
631
    for (Substream substream : state.drainedSubstreams) {
1✔
632
      if (substream.stream.isReady()) {
1✔
633
        return true;
1✔
634
      }
635
    }
1✔
636
    return false;
1✔
637
  }
638

639
  @Override
640
  public void optimizeForDirectExecutor() {
641
    class OptimizeDirectEntry implements BufferEntry {
1✔
642
      @Override
643
      public void runWith(Substream substream) {
644
        substream.stream.optimizeForDirectExecutor();
1✔
645
      }
1✔
646
    }
647

648
    delayOrExecute(new OptimizeDirectEntry());
1✔
649
  }
1✔
650

651
  @Override
652
  public final void setCompressor(final Compressor compressor) {
653
    class CompressorEntry implements BufferEntry {
1✔
654
      @Override
655
      public void runWith(Substream substream) {
656
        substream.stream.setCompressor(compressor);
1✔
657
      }
1✔
658
    }
659

660
    delayOrExecute(new CompressorEntry());
1✔
661
  }
1✔
662

663
  @Override
664
  public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
665
    class FullStreamDecompressionEntry implements BufferEntry {
1✔
666
      @Override
667
      public void runWith(Substream substream) {
668
        substream.stream.setFullStreamDecompression(fullStreamDecompression);
1✔
669
      }
1✔
670
    }
671

672
    delayOrExecute(new FullStreamDecompressionEntry());
1✔
673
  }
1✔
674

675
  @Override
676
  public final void setMessageCompression(final boolean enable) {
677
    class MessageCompressionEntry implements BufferEntry {
1✔
678
      @Override
679
      public void runWith(Substream substream) {
680
        substream.stream.setMessageCompression(enable);
1✔
681
      }
1✔
682
    }
683

684
    delayOrExecute(new MessageCompressionEntry());
1✔
685
  }
1✔
686

687
  @Override
688
  public final void halfClose() {
689
    class HalfCloseEntry implements BufferEntry {
1✔
690
      @Override
691
      public void runWith(Substream substream) {
692
        substream.stream.halfClose();
1✔
693
      }
1✔
694
    }
695

696
    delayOrExecute(new HalfCloseEntry());
1✔
697
  }
1✔
698

699
  @Override
700
  public final void setAuthority(final String authority) {
701
    class AuthorityEntry implements BufferEntry {
1✔
702
      @Override
703
      public void runWith(Substream substream) {
704
        substream.stream.setAuthority(authority);
1✔
705
      }
1✔
706
    }
707

708
    delayOrExecute(new AuthorityEntry());
1✔
709
  }
1✔
710

711
  @Override
712
  public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
713
    class DecompressorRegistryEntry implements BufferEntry {
1✔
714
      @Override
715
      public void runWith(Substream substream) {
716
        substream.stream.setDecompressorRegistry(decompressorRegistry);
1✔
717
      }
1✔
718
    }
719

720
    delayOrExecute(new DecompressorRegistryEntry());
1✔
721
  }
1✔
722

723
  @Override
724
  public final void setMaxInboundMessageSize(final int maxSize) {
725
    class MaxInboundMessageSizeEntry implements BufferEntry {
1✔
726
      @Override
727
      public void runWith(Substream substream) {
728
        substream.stream.setMaxInboundMessageSize(maxSize);
1✔
729
      }
1✔
730
    }
731

732
    delayOrExecute(new MaxInboundMessageSizeEntry());
1✔
733
  }
1✔
734

735
  @Override
736
  public final void setMaxOutboundMessageSize(final int maxSize) {
737
    class MaxOutboundMessageSizeEntry implements BufferEntry {
1✔
738
      @Override
739
      public void runWith(Substream substream) {
740
        substream.stream.setMaxOutboundMessageSize(maxSize);
1✔
741
      }
1✔
742
    }
743

744
    delayOrExecute(new MaxOutboundMessageSizeEntry());
1✔
745
  }
1✔
746

747
  @Override
748
  public final void setDeadline(final Deadline deadline) {
749
    class DeadlineEntry implements BufferEntry {
1✔
750
      @Override
751
      public void runWith(Substream substream) {
752
        substream.stream.setDeadline(deadline);
1✔
753
      }
1✔
754
    }
755

756
    delayOrExecute(new DeadlineEntry());
1✔
757
  }
1✔
758

759
  @Override
760
  public final Attributes getAttributes() {
761
    if (state.winningSubstream != null) {
1✔
762
      return state.winningSubstream.stream.getAttributes();
1✔
763
    }
764
    return Attributes.EMPTY;
×
765
  }
766

767
  @Override
768
  public void appendTimeoutInsight(InsightBuilder insight) {
769
    State currentState;
770
    synchronized (lock) {
1✔
771
      insight.appendKeyValue("closed", closedSubstreamsInsight);
1✔
772
      currentState = state;
1✔
773
    }
1✔
774
    if (currentState.winningSubstream != null) {
1✔
775
      // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
776
      // of the winning substream, they may not have received closed() notifications yet, thus they
777
      // may be missing from closedSubstreamsInsight.  This may be a little confusing to the user.
778
      // We need to figure out how to include them.
779
      InsightBuilder substreamInsight = new InsightBuilder();
1✔
780
      currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
1✔
781
      insight.appendKeyValue("committed", substreamInsight);
1✔
782
    } else {
1✔
783
      InsightBuilder openSubstreamsInsight = new InsightBuilder();
1✔
784
      // drainedSubstreams doesn't include all open substreams.  Those which have just been created
785
      // and are still catching up with buffered requests (in other words, still draining) will not
786
      // show up.  We think this is benign, because the draining should be typically fast, and it'd
787
      // be indistinguishable from the case where those streams are to be created a little late due
788
      // to delays in the timer.
789
      for (Substream sub : currentState.drainedSubstreams) {
1✔
790
        InsightBuilder substreamInsight = new InsightBuilder();
1✔
791
        sub.stream.appendTimeoutInsight(substreamInsight);
1✔
792
        openSubstreamsInsight.append(substreamInsight);
1✔
793
      }
1✔
794
      insight.appendKeyValue("open", openSubstreamsInsight);
1✔
795
    }
796
  }
1✔
797

798
  private static Random random = new Random();
1✔
799

800
  @VisibleForTesting
801
  static void setRandom(Random random) {
802
    RetriableStream.random = random;
1✔
803
  }
1✔
804

805
  /**
806
   * Whether there is any potential hedge at the moment. A false return value implies there is
807
   * absolutely no potential hedge. At least one of the hedges will observe a false return value
808
   * when calling this method, unless otherwise the rpc is committed.
809
   */
810
  // only called when isHedging is true
811
  @GuardedBy("lock")
812
  private boolean hasPotentialHedging(State state) {
813
    return state.winningSubstream == null
1✔
814
        && state.hedgingAttemptCount < hedgingPolicy.maxAttempts
815
        && !state.hedgingFrozen;
816
  }
817

818
  @SuppressWarnings("GuardedBy")  // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
819
  private void freezeHedging() {
820
    Future<?> futureToBeCancelled = null;
1✔
821
    synchronized (lock) {
1✔
822
      if (scheduledHedging != null) {
1✔
823
        futureToBeCancelled = scheduledHedging.markCancelled();
1✔
824
        scheduledHedging = null;
1✔
825
      }
826
      state = state.freezeHedging();
1✔
827
    }
1✔
828

829
    if (futureToBeCancelled != null) {
1✔
830
      futureToBeCancelled.cancel(false);
1✔
831
    }
832
  }
1✔
833

834
  private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
835
    savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
1✔
836
        metadata);
837
    if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
1✔
838
      listenerSerializeExecutor.execute(
1✔
839
          new Runnable() {
1✔
840
            @Override
841
            public void run() {
842
              isClosed = true;
1✔
843
              masterListener.closed(status, progress, metadata);
1✔
844
            }
1✔
845
          });
846
    }
847
  }
1✔
848

849
  private static final class SavedCloseMasterListenerReason {
850
    private final Status status;
851
    private final RpcProgress progress;
852
    private final Metadata metadata;
853

854
    SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
1✔
855
      this.status = status;
1✔
856
      this.progress = progress;
1✔
857
      this.metadata = metadata;
1✔
858
    }
1✔
859
  }
860

861
  private interface BufferEntry {
862
    /** Replays the buffer entry with the given stream. */
863
    void runWith(Substream substream);
864
  }
865

866
  private final class Sublistener implements ClientStreamListener {
1✔
867
    final Substream substream;
868

869
    Sublistener(Substream substream) {
1✔
870
      this.substream = substream;
1✔
871
    }
1✔
872

873
    @Override
874
    public void headersRead(final Metadata headers) {
875
      if (substream.previousAttemptCount > 0) {
1✔
876
        headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
1✔
877
        headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
1✔
878
      }
879
      commitAndRun(substream);
1✔
880
      if (state.winningSubstream == substream) {
1✔
881
        if (throttle != null) {
1✔
882
          throttle.onSuccess();
1✔
883
        }
884
        listenerSerializeExecutor.execute(
1✔
885
            new Runnable() {
1✔
886
              @Override
887
              public void run() {
888
                masterListener.headersRead(headers);
1✔
889
              }
1✔
890
            });
891
      }
892
    }
1✔
893

894
    @Override
895
    public void closed(
896
        final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
897
      synchronized (lock) {
1✔
898
        state = state.substreamClosed(substream);
1✔
899
        closedSubstreamsInsight.append(status.getCode());
1✔
900
      }
1✔
901

902
      if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
903
        assert savedCloseMasterListenerReason != null;
1✔
904
        listenerSerializeExecutor.execute(
1✔
905
            new Runnable() {
1✔
906
              @Override
907
              public void run() {
908
                isClosed = true;
1✔
909
                masterListener.closed(savedCloseMasterListenerReason.status,
1✔
910
                    savedCloseMasterListenerReason.progress,
1✔
911
                    savedCloseMasterListenerReason.metadata);
1✔
912
              }
1✔
913
            });
914
        return;
1✔
915
      }
916

917
      // handle a race between buffer limit exceeded and closed, when setting
918
      // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
919
      if (substream.bufferLimitExceeded) {
1✔
920
        commitAndRun(substream);
1✔
921
        if (state.winningSubstream == substream) {
1✔
922
          safeCloseMasterListener(status, rpcProgress, trailers);
1✔
923
        }
924
        return;
1✔
925
      }
926
      if (rpcProgress == RpcProgress.MISCARRIED
1✔
927
          && localOnlyTransparentRetries.incrementAndGet() > 1_000) {
1✔
928
        commitAndRun(substream);
1✔
929
        if (state.winningSubstream == substream) {
1✔
930
          Status tooManyTransparentRetries = Status.INTERNAL
1✔
931
              .withDescription("Too many transparent retries. Might be a bug in gRPC")
1✔
932
              .withCause(status.asRuntimeException());
1✔
933
          safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
1✔
934
        }
935
        return;
1✔
936
      }
937

938
      if (state.winningSubstream == null) {
1✔
939
        if (rpcProgress == RpcProgress.MISCARRIED
1✔
940
            || (rpcProgress == RpcProgress.REFUSED
941
                && noMoreTransparentRetry.compareAndSet(false, true))) {
1✔
942
          // transparent retry
943
          final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
1✔
944
          if (newSubstream == null) {
1✔
945
            return;
×
946
          }
947
          if (isHedging) {
1✔
948
            synchronized (lock) {
1✔
949
              // Although this operation is not done atomically with
950
              // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of
951
              // activeHedges, so neither does it affect the commitment decision of other threads,
952
              // nor do the commitment decision making threads affect itself.
953
              state = state.replaceActiveHedge(substream, newSubstream);
1✔
954
            }
1✔
955
          }
956

957
          callExecutor.execute(new Runnable() {
1✔
958
            @Override
959
            public void run() {
960
              drain(newSubstream);
1✔
961
            }
1✔
962
          });
963
          return;
1✔
964
        } else if (rpcProgress == RpcProgress.DROPPED) {
1✔
965
          // For normal retry, nothing need be done here, will just commit.
966
          // For hedging, cancel scheduled hedge that is scheduled prior to the drop
967
          if (isHedging) {
1✔
968
            freezeHedging();
×
969
          }
970
        } else {
971
          noMoreTransparentRetry.set(true);
1✔
972

973
          if (isHedging) {
1✔
974
            HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
1✔
975
            if (hedgingPlan.isHedgeable) {
1✔
976
              pushbackHedging(hedgingPlan.hedgingPushbackMillis);
1✔
977
            }
978
            synchronized (lock) {
1✔
979
              state = state.removeActiveHedge(substream);
1✔
980
              // The invariant is whether or not #(Potential Hedge + active hedges) > 0.
981
              // Once hasPotentialHedging(state) is false, it will always be false, and then
982
              // #(state.activeHedges) will be decreasing. This guarantees that even there may be
983
              // multiple concurrent hedges, one of the hedges will end up committed.
984
              if (hedgingPlan.isHedgeable) {
1✔
985
                if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
1✔
986
                  return;
1✔
987
                }
988
                // else, no activeHedges, no new hedges possible, try to commit
989
              } // else, isHedgeable is false, try to commit
990
            }
1✔
991
          } else {
1✔
992
            RetryPlan retryPlan = makeRetryDecision(status, trailers);
1✔
993
            if (retryPlan.shouldRetry) {
1✔
994
              // retry
995
              Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false);
1✔
996
              if (newSubstream == null) {
1✔
997
                return;
×
998
              }
999
              // The check state.winningSubstream == null, checking if is not already committed, is
1000
              // racy, but is still safe b/c the retry will also handle committed/cancellation
1001
              FutureCanceller scheduledRetryCopy;
1002
              synchronized (lock) {
1✔
1003
                scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
1✔
1004
              }
1✔
1005

1006
              class RetryBackoffRunnable implements Runnable {
1✔
1007
                @Override
1008
                @SuppressWarnings("FutureReturnValueIgnored")
1009
                public void run() {
1010
                  synchronized (scheduledRetryCopy.lock) {
1✔
1011
                    if (scheduledRetryCopy.isCancelled()) {
1✔
1012
                      return;
×
1013
                    } else {
1014
                      scheduledRetryCopy.markCancelled();
1✔
1015
                    }
1016
                  }
1✔
1017

1018
                  callExecutor.execute(
1✔
1019
                      new Runnable() {
1✔
1020
                        @Override
1021
                        public void run() {
1022
                          drain(newSubstream);
1✔
1023
                        }
1✔
1024
                      });
1025
                }
1✔
1026
              }
1027

1028
              scheduledRetryCopy.setFuture(
1✔
1029
                  scheduledExecutorService.schedule(
1✔
1030
                      new RetryBackoffRunnable(),
1031
                      retryPlan.backoffNanos,
1032
                      TimeUnit.NANOSECONDS));
1033
              return;
1✔
1034
            }
1035
          }
1036
        }
1037
      }
1038

1039
      commitAndRun(substream);
1✔
1040
      if (state.winningSubstream == substream) {
1✔
1041
        safeCloseMasterListener(status, rpcProgress, trailers);
1✔
1042
      }
1043
    }
1✔
1044

1045
    /**
1046
     * Decides in current situation whether or not the RPC should retry and if it should retry how
1047
     * long the backoff should be. The decision does not take the commitment status into account, so
1048
     * caller should check it separately. It also updates the throttle. It does not change state.
1049
     */
1050
    private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
1051
      if (retryPolicy == null) {
1✔
1052
        return new RetryPlan(false, 0);
1✔
1053
      }
1054
      boolean shouldRetry = false;
1✔
1055
      long backoffNanos = 0L;
1✔
1056
      boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
1✔
1057
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1058
      boolean isThrottled = false;
1✔
1059
      if (throttle != null) {
1✔
1060
        if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1061
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1062
        }
1063
      }
1064

1065
      if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
1✔
1066
        if (pushbackMillis == null) {
1✔
1067
          if (isRetryableStatusCode) {
1✔
1068
            shouldRetry = true;
1✔
1069
            backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble());
1✔
1070
            nextBackoffIntervalNanos = Math.min(
1✔
1071
                (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
1✔
1072
                retryPolicy.maxBackoffNanos);
1✔
1073
          } // else no retry
1074
        } else if (pushbackMillis >= 0) {
1✔
1075
          shouldRetry = true;
1✔
1076
          backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
1✔
1077
          nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
1078
        } // else no retry
1079
      } // else no retry
1080

1081
      return new RetryPlan(shouldRetry, backoffNanos);
1✔
1082
    }
1083

1084
    private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
1085
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1086
      boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
1✔
1087
      boolean isThrottled = false;
1✔
1088
      if (throttle != null) {
1✔
1089
        if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1090
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1091
        }
1092
      }
1093
      if (!isFatal && !isThrottled && !status.isOk()
1✔
1094
          && (pushbackMillis != null && pushbackMillis > 0)) {
1✔
1095
        pushbackMillis = 0; // We want the retry after a nonfatal error to be immediate
1✔
1096
      }
1097
      return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
1✔
1098
    }
1099

1100
    @Nullable
1101
    private Integer getPushbackMills(Metadata trailer) {
1102
      String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
1✔
1103
      Integer pushbackMillis = null;
1✔
1104
      if (pushbackStr != null) {
1✔
1105
        try {
1106
          pushbackMillis = Integer.valueOf(pushbackStr);
1✔
1107
        } catch (NumberFormatException e) {
1✔
1108
          pushbackMillis = -1;
1✔
1109
        }
1✔
1110
      }
1111
      return pushbackMillis;
1✔
1112
    }
1113

1114
    @Override
1115
    public void messagesAvailable(final MessageProducer producer) {
1116
      State savedState = state;
1✔
1117
      checkState(
1✔
1118
          savedState.winningSubstream != null, "Headers should be received prior to messages.");
1119
      if (savedState.winningSubstream != substream) {
1✔
1120
        GrpcUtil.closeQuietly(producer);
1✔
1121
        return;
1✔
1122
      }
1123
      listenerSerializeExecutor.execute(
1✔
1124
          new Runnable() {
1✔
1125
            @Override
1126
            public void run() {
1127
              masterListener.messagesAvailable(producer);
1✔
1128
            }
1✔
1129
          });
1130
    }
1✔
1131

1132
    @Override
1133
    public void onReady() {
1134
      // FIXME(#7089): hedging case is broken.
1135
      if (!isReady()) {
1✔
1136
        return;
1✔
1137
      }
1138
      listenerSerializeExecutor.execute(
1✔
1139
          new Runnable() {
1✔
1140
            @Override
1141
            public void run() {
1142
              if (!isClosed) {
1✔
1143
                masterListener.onReady();
1✔
1144
              }
1145
            }
1✔
1146
          });
1147
    }
1✔
1148
  }
1149

1150
  private static final class State {
1151
    /** Committed and the winning substream drained. */
1152
    final boolean passThrough;
1153

1154
    /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
1155
    @Nullable final List<BufferEntry> buffer;
1156

1157
    /**
1158
     * Unmodifiable collection of all the open substreams that are drained. Singleton once
1159
     * passThrough; Empty if committed but not passTrough.
1160
     */
1161
    final Collection<Substream> drainedSubstreams;
1162

1163
    /**
1164
     * Unmodifiable collection of all the active hedging substreams.
1165
     *
1166
     * <p>A substream even with the attribute substream.closed being true may be considered still
1167
     * "active" at the moment as long as it is in this collection.
1168
     */
1169
    final Collection<Substream> activeHedges; // not null once isHedging = true
1170

1171
    final int hedgingAttemptCount;
1172

1173
    /** Null until committed. */
1174
    @Nullable final Substream winningSubstream;
1175

1176
    /** Not required to set to true when cancelled, but can short-circuit the draining process. */
1177
    final boolean cancelled;
1178

1179
    /** No more hedging due to events like drop or pushback. */
1180
    final boolean hedgingFrozen;
1181

1182
    State(
1183
        @Nullable List<BufferEntry> buffer,
1184
        Collection<Substream> drainedSubstreams,
1185
        Collection<Substream> activeHedges,
1186
        @Nullable Substream winningSubstream,
1187
        boolean cancelled,
1188
        boolean passThrough,
1189
        boolean hedgingFrozen,
1190
        int hedgingAttemptCount) {
1✔
1191
      this.buffer = buffer;
1✔
1192
      this.drainedSubstreams =
1✔
1193
          checkNotNull(drainedSubstreams, "drainedSubstreams");
1✔
1194
      this.winningSubstream = winningSubstream;
1✔
1195
      this.activeHedges = activeHedges;
1✔
1196
      this.cancelled = cancelled;
1✔
1197
      this.passThrough = passThrough;
1✔
1198
      this.hedgingFrozen = hedgingFrozen;
1✔
1199
      this.hedgingAttemptCount = hedgingAttemptCount;
1✔
1200

1201
      checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
1✔
1202
      checkState(
1✔
1203
          !passThrough || winningSubstream != null,
1204
          "passThrough should imply winningSubstream != null");
1205
      checkState(
1✔
1206
          !passThrough
1207
              || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
1✔
1208
              || (drainedSubstreams.size() == 0 && winningSubstream.closed),
1✔
1209
          "passThrough should imply winningSubstream is drained");
1210
      checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
1✔
1211
    }
1✔
1212

1213
    @CheckReturnValue
1214
    // GuardedBy RetriableStream.lock
1215
    State cancelled() {
1216
      return new State(
1✔
1217
          buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough,
1218
          hedgingFrozen, hedgingAttemptCount);
1219
    }
1220

1221
    /** The given substream is drained. */
1222
    @CheckReturnValue
1223
    // GuardedBy RetriableStream.lock
1224
    State substreamDrained(Substream substream) {
1225
      checkState(!passThrough, "Already passThrough");
1✔
1226

1227
      Collection<Substream> drainedSubstreams;
1228
      
1229
      if (substream.closed) {
1✔
1230
        drainedSubstreams = this.drainedSubstreams;
1✔
1231
      } else if (this.drainedSubstreams.isEmpty()) {
1✔
1232
        // optimize for 0-retry, which is most of the cases.
1233
        drainedSubstreams = Collections.singletonList(substream);
1✔
1234
      } else {
1235
        drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1236
        drainedSubstreams.add(substream);
1✔
1237
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1238
      }
1239

1240
      boolean passThrough = winningSubstream != null;
1✔
1241

1242
      List<BufferEntry> buffer = this.buffer;
1✔
1243
      if (passThrough) {
1✔
1244
        checkState(
1✔
1245
            winningSubstream == substream, "Another RPC attempt has already committed");
1246
        buffer = null;
1✔
1247
      }
1248

1249
      return new State(
1✔
1250
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1251
          hedgingFrozen, hedgingAttemptCount);
1252
    }
1253

1254
    /** The given substream is closed. */
1255
    @CheckReturnValue
1256
    // GuardedBy RetriableStream.lock
1257
    State substreamClosed(Substream substream) {
1258
      substream.closed = true;
1✔
1259
      if (this.drainedSubstreams.contains(substream)) {
1✔
1260
        Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1261
        drainedSubstreams.remove(substream);
1✔
1262
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1263
        return new State(
1✔
1264
            buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1265
            hedgingFrozen, hedgingAttemptCount);
1266
      } else {
1267
        return this;
1✔
1268
      }
1269
    }
1270

1271
    @CheckReturnValue
1272
    // GuardedBy RetriableStream.lock
1273
    State committed(Substream winningSubstream) {
1274
      checkState(this.winningSubstream == null, "Already committed");
1✔
1275

1276
      boolean passThrough = false;
1✔
1277
      List<BufferEntry> buffer = this.buffer;
1✔
1278
      Collection<Substream> drainedSubstreams;
1279

1280
      if (this.drainedSubstreams.contains(winningSubstream)) {
1✔
1281
        passThrough = true;
1✔
1282
        buffer = null;
1✔
1283
        drainedSubstreams = Collections.singleton(winningSubstream);
1✔
1284
      } else {
1285
        drainedSubstreams = Collections.emptyList();
1✔
1286
      }
1287

1288
      return new State(
1✔
1289
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1290
          hedgingFrozen, hedgingAttemptCount);
1291
    }
1292

1293
    @CheckReturnValue
1294
    // GuardedBy RetriableStream.lock
1295
    State freezeHedging() {
1296
      if (hedgingFrozen) {
1✔
1297
        return this;
×
1298
      }
1299
      return new State(
1✔
1300
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1301
          true, hedgingAttemptCount);
1302
    }
1303

1304
    @CheckReturnValue
1305
    // GuardedBy RetriableStream.lock
1306
    // state.hedgingAttemptCount is modified only here.
1307
    // The method is only called in RetriableStream.start() and HedgingRunnable.run()
1308
    State addActiveHedge(Substream substream) {
1309
      // hasPotentialHedging must be true
1310
      checkState(!hedgingFrozen, "hedging frozen");
1✔
1311
      checkState(winningSubstream == null, "already committed");
1✔
1312

1313
      Collection<Substream> activeHedges;
1314
      if (this.activeHedges == null) {
1✔
1315
        activeHedges = Collections.singleton(substream);
1✔
1316
      } else {
1317
        activeHedges = new ArrayList<>(this.activeHedges);
1✔
1318
        activeHedges.add(substream);
1✔
1319
        activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1320
      }
1321

1322
      int hedgingAttemptCount = this.hedgingAttemptCount + 1;
1✔
1323
      return new State(
1✔
1324
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1325
          hedgingFrozen, hedgingAttemptCount);
1326
    }
1327

1328
    @CheckReturnValue
1329
    // GuardedBy RetriableStream.lock
1330
    // The method is only called in Sublistener.closed()
1331
    State removeActiveHedge(Substream substream) {
1332
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1333
      activeHedges.remove(substream);
1✔
1334
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1335

1336
      return new State(
1✔
1337
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1338
          hedgingFrozen, hedgingAttemptCount);
1339
    }
1340

1341
    @CheckReturnValue
1342
    // GuardedBy RetriableStream.lock
1343
    // The method is only called for transparent retry.
1344
    State replaceActiveHedge(Substream oldOne, Substream newOne) {
1345
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1346
      activeHedges.remove(oldOne);
1✔
1347
      activeHedges.add(newOne);
1✔
1348
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1349

1350
      return new State(
1✔
1351
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1352
          hedgingFrozen, hedgingAttemptCount);
1353
    }
1354
  }
1355

1356
  /**
1357
   * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
1358
   *  attributes.
1359
   */
1360
  private static final class Substream {
1361
    ClientStream stream;
1362

1363
    // GuardedBy RetriableStream.lock
1364
    boolean closed;
1365

1366
    // setting to true must be GuardedBy RetriableStream.lock
1367
    boolean bufferLimitExceeded;
1368

1369
    final int previousAttemptCount;
1370

1371
    Substream(int previousAttemptCount) {
1✔
1372
      this.previousAttemptCount = previousAttemptCount;
1✔
1373
    }
1✔
1374
  }
1375

1376

1377
  /**
1378
   * Traces the buffer used by a substream.
1379
   */
1380
  class BufferSizeTracer extends ClientStreamTracer {
1381
    // Each buffer size tracer is dedicated to one specific substream.
1382
    private final Substream substream;
1383

1384
    @GuardedBy("lock")
1385
    long bufferNeeded;
1386

1387
    BufferSizeTracer(Substream substream) {
1✔
1388
      this.substream = substream;
1✔
1389
    }
1✔
1390

1391
    /**
1392
     * A message is sent to the wire, so its reference would be released if no retry or
1393
     * hedging were involved. So at this point we have to hold the reference of the message longer
1394
     * for retry, and we need to increment {@code substream.bufferNeeded}.
1395
     */
1396
    @Override
1397
    public void outboundWireSize(long bytes) {
1398
      if (state.winningSubstream != null) {
1✔
1399
        return;
1✔
1400
      }
1401

1402
      Runnable postCommitTask = null;
1✔
1403

1404
      // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
1405
      synchronized (lock) {
1✔
1406
        if (state.winningSubstream != null || substream.closed) {
1✔
1407
          return;
×
1408
        }
1409
        bufferNeeded += bytes;
1✔
1410
        if (bufferNeeded <= perRpcBufferUsed) {
1✔
1411
          return;
1✔
1412
        }
1413

1414
        if (bufferNeeded > perRpcBufferLimit) {
1✔
1415
          substream.bufferLimitExceeded = true;
1✔
1416
        } else {
1417
          // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
1418
          long savedChannelBufferUsed =
1✔
1419
              channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
1✔
1420
          perRpcBufferUsed = bufferNeeded;
1✔
1421

1422
          if (savedChannelBufferUsed > channelBufferLimit) {
1✔
1423
            substream.bufferLimitExceeded = true;
1✔
1424
          }
1425
        }
1426

1427
        if (substream.bufferLimitExceeded) {
1✔
1428
          postCommitTask = commit(substream);
1✔
1429
        }
1430
      }
1✔
1431

1432
      if (postCommitTask != null) {
1✔
1433
        postCommitTask.run();
1✔
1434
      }
1435
    }
1✔
1436
  }
1437

1438
  /**
1439
   *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
1440
   *  the Channel. There should be a single instance of it for each channel.
1441
   */
1442
  static final class ChannelBufferMeter {
1✔
1443
    private final AtomicLong bufferUsed = new AtomicLong();
1✔
1444

1445
    @VisibleForTesting
1446
    long addAndGet(long newBytesUsed) {
1447
      return bufferUsed.addAndGet(newBytesUsed);
1✔
1448
    }
1449
  }
1450

1451
  /**
1452
   * Used for retry throttling.
1453
   */
1454
  static final class Throttle {
1455

1456
    private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
1457

1458
    /**
1459
     * 1000 times the maxTokens field of the retryThrottling policy in service config.
1460
     * The number of tokens starts at maxTokens. The token_count will always be between 0 and
1461
     * maxTokens.
1462
     */
1463
    final int maxTokens;
1464

1465
    /**
1466
     * Half of {@code maxTokens}.
1467
     */
1468
    final int threshold;
1469

1470
    /**
1471
     * 1000 times the tokenRatio field of the retryThrottling policy in service config.
1472
     */
1473
    final int tokenRatio;
1474

1475
    final AtomicInteger tokenCount = new AtomicInteger();
1✔
1476

1477
    Throttle(float maxTokens, float tokenRatio) {
1✔
1478
      // tokenRatio is up to 3 decimal places
1479
      this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1480
      this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1481
      this.threshold = this.maxTokens / 2;
1✔
1482
      tokenCount.set(this.maxTokens);
1✔
1483
    }
1✔
1484

1485
    @VisibleForTesting
1486
    boolean isAboveThreshold() {
1487
      return tokenCount.get() > threshold;
1✔
1488
    }
1489

1490
    /**
1491
     * Counts down the token on qualified failure and checks if it is above the threshold
1492
     * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
1493
     * a not-to-retry pushback.
1494
     */
1495
    @VisibleForTesting
1496
    boolean onQualifiedFailureThenCheckIsAboveThreshold() {
1497
      while (true) {
1498
        int currentCount = tokenCount.get();
1✔
1499
        if (currentCount == 0) {
1✔
1500
          return false;
1✔
1501
        }
1502
        int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1503
        boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
1✔
1504
        if (updated) {
1✔
1505
          return decremented > threshold;
1✔
1506
        }
1507
      }
×
1508
    }
1509

1510
    @VisibleForTesting
1511
    void onSuccess() {
1512
      while (true) {
1513
        int currentCount = tokenCount.get();
1✔
1514
        if (currentCount == maxTokens) {
1✔
1515
          break;
1✔
1516
        }
1517
        int incremented = currentCount + tokenRatio;
1✔
1518
        boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
1✔
1519
        if (updated) {
1✔
1520
          break;
1✔
1521
        }
1522
      }
×
1523
    }
1✔
1524

1525
    @Override
1526
    public boolean equals(Object o) {
1527
      if (this == o) {
1✔
1528
        return true;
×
1529
      }
1530
      if (!(o instanceof Throttle)) {
1✔
1531
        return false;
×
1532
      }
1533
      Throttle that = (Throttle) o;
1✔
1534
      return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
1✔
1535
    }
1536

1537
    @Override
1538
    public int hashCode() {
1539
      return Objects.hashCode(maxTokens, tokenRatio);
×
1540
    }
1541
  }
1542

1543
  private static final class RetryPlan {
1544
    final boolean shouldRetry;
1545
    final long backoffNanos;
1546

1547
    RetryPlan(boolean shouldRetry, long backoffNanos) {
1✔
1548
      this.shouldRetry = shouldRetry;
1✔
1549
      this.backoffNanos = backoffNanos;
1✔
1550
    }
1✔
1551
  }
1552

1553
  private static final class HedgingPlan {
1554
    final boolean isHedgeable;
1555
    @Nullable
1556
    final Integer hedgingPushbackMillis;
1557

1558
    public HedgingPlan(
1559
        boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
1✔
1560
      this.isHedgeable = isHedgeable;
1✔
1561
      this.hedgingPushbackMillis = hedgingPushbackMillis;
1✔
1562
    }
1✔
1563
  }
1564

1565
  /** Allows cancelling a Future without racing with setting the future. */
1566
  private static final class FutureCanceller {
1567

1568
    final Object lock;
1569
    @GuardedBy("lock")
1570
    Future<?> future;
1571
    @GuardedBy("lock")
1572
    boolean cancelled;
1573

1574
    FutureCanceller(Object lock) {
1✔
1575
      this.lock = lock;
1✔
1576
    }
1✔
1577

1578
    void setFuture(Future<?> future) {
1579
      boolean wasCancelled;
1580
      synchronized (lock) {
1✔
1581
        wasCancelled = cancelled;
1✔
1582
        if (!wasCancelled) {
1✔
1583
          this.future = future;
1✔
1584
        }
1585
      }
1✔
1586
      if (wasCancelled) {
1✔
1587
        future.cancel(false);
×
1588
      }
1589
    }
1✔
1590

1591
    @GuardedBy("lock")
1592
    @CheckForNull // Must cancel the returned future if not null.
1593
    Future<?> markCancelled() {
1594
      cancelled = true;
1✔
1595
      return future;
1✔
1596
    }
1597

1598
    @GuardedBy("lock")
1599
    boolean isCancelled() {
1600
      return cancelled;
1✔
1601
    }
1602
  }
1603
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc