• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18712

pending completion
#18712

push

github-actions

web-flow
Sort the policies in a rule by policy name when parsing from proto. (#10334)

* Sort the policies in a rule by policy name when parsing from proto.  This fixes the server sending a GOAWAY when an LDS update with no changes other than ordering is received.

* Remove use of deprecated method setSourceIp

* Fix style issues

* Update RbacFilterTest.java

30569 of 34649 relevant lines covered (88.22%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.4
/../core/src/main/java/io/grpc/internal/RetriableStream.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Objects;
25
import io.grpc.Attributes;
26
import io.grpc.ClientStreamTracer;
27
import io.grpc.Compressor;
28
import io.grpc.Deadline;
29
import io.grpc.DecompressorRegistry;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.internal.ClientStreamListener.RpcProgress;
35
import java.io.InputStream;
36
import java.lang.Thread.UncaughtExceptionHandler;
37
import java.util.ArrayList;
38
import java.util.Collection;
39
import java.util.Collections;
40
import java.util.List;
41
import java.util.Random;
42
import java.util.concurrent.Executor;
43
import java.util.concurrent.Future;
44
import java.util.concurrent.ScheduledExecutorService;
45
import java.util.concurrent.TimeUnit;
46
import java.util.concurrent.atomic.AtomicBoolean;
47
import java.util.concurrent.atomic.AtomicInteger;
48
import java.util.concurrent.atomic.AtomicLong;
49
import javax.annotation.CheckForNull;
50
import javax.annotation.CheckReturnValue;
51
import javax.annotation.Nullable;
52
import javax.annotation.concurrent.GuardedBy;
53

54
/** A logical {@link ClientStream} that is retriable. */
55
abstract class RetriableStream<ReqT> implements ClientStream {
56
  @VisibleForTesting
57
  static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
1✔
58
      Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
1✔
59

60
  @VisibleForTesting
61
  static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
1✔
62
      Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
1✔
63

64
  private static final Status CANCELLED_BECAUSE_COMMITTED =
1✔
65
      Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
1✔
66

67
  private final MethodDescriptor<ReqT, ?> method;
68
  private final Executor callExecutor;
69
  private final Executor listenerSerializeExecutor = new SynchronizationContext(
1✔
70
      new UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          throw Status.fromThrowable(e)
×
74
              .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
×
75
              .asRuntimeException();
×
76
        }
77
      }
78
  );
79
  private final ScheduledExecutorService scheduledExecutorService;
80
  // Must not modify it.
81
  private final Metadata headers;
82
  @Nullable
83
  private final RetryPolicy retryPolicy;
84
  @Nullable
85
  private final HedgingPolicy hedgingPolicy;
86
  private final boolean isHedging;
87

88
  /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
89
  private final Object lock = new Object();
1✔
90

91
  private final ChannelBufferMeter channelBufferUsed;
92
  private final long perRpcBufferLimit;
93
  private final long channelBufferLimit;
94
  @Nullable
95
  private final Throttle throttle;
96
  @GuardedBy("lock")
1✔
97
  private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
98

99
  private volatile State state = new State(
1✔
100
      new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
1✔
101
      false, 0);
102

103
  /**
104
   * Either non-local transparent retry happened or reached server's application logic.
105
   *
106
   * <p>Note that local-only transparent retries are unlimited.
107
   */
108
  private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
1✔
109
  private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
1✔
110
  private final AtomicInteger inFlightSubStreams = new AtomicInteger();
1✔
111
  private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112

113
  // Used for recording the share of buffer used for the current call out of the channel buffer.
114
  // This field would not be necessary if there is no channel buffer limit.
115
  @GuardedBy("lock")
116
  private long perRpcBufferUsed;
117

118
  private ClientStreamListener masterListener;
119
  @GuardedBy("lock")
120
  private FutureCanceller scheduledRetry;
121
  @GuardedBy("lock")
122
  private FutureCanceller scheduledHedging;
123
  private long nextBackoffIntervalNanos;
124
  private Status cancellationStatus;
125
  private boolean isClosed;
126

127
  RetriableStream(
128
      MethodDescriptor<ReqT, ?> method, Metadata headers,
129
      ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
130
      Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
131
      @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy,
132
      @Nullable Throttle throttle) {
1✔
133
    this.method = method;
1✔
134
    this.channelBufferUsed = channelBufferUsed;
1✔
135
    this.perRpcBufferLimit = perRpcBufferLimit;
1✔
136
    this.channelBufferLimit = channelBufferLimit;
1✔
137
    this.callExecutor = callExecutor;
1✔
138
    this.scheduledExecutorService = scheduledExecutorService;
1✔
139
    this.headers = headers;
1✔
140
    this.retryPolicy = retryPolicy;
1✔
141
    if (retryPolicy != null) {
1✔
142
      this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
143
    }
144
    this.hedgingPolicy = hedgingPolicy;
1✔
145
    checkArgument(
1✔
146
        retryPolicy == null || hedgingPolicy == null,
147
        "Should not provide both retryPolicy and hedgingPolicy");
148
    this.isHedging = hedgingPolicy != null;
1✔
149
    this.throttle = throttle;
1✔
150
  }
1✔
151

152
  @SuppressWarnings("GuardedBy")
153
  @Nullable // null if already committed
154
  @CheckReturnValue
155
  private Runnable commit(final Substream winningSubstream) {
156

157
    synchronized (lock) {
1✔
158
      if (state.winningSubstream != null) {
1✔
159
        return null;
1✔
160
      }
161
      final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
1✔
162

163
      state = state.committed(winningSubstream);
1✔
164

165
      // subtract the share of this RPC from channelBufferUsed.
166
      channelBufferUsed.addAndGet(-perRpcBufferUsed);
1✔
167

168
      final Future<?> retryFuture;
169
      if (scheduledRetry != null) {
1✔
170
        // TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead
171
        // found: 'this.lock'
172
        retryFuture = scheduledRetry.markCancelled();
1✔
173
        scheduledRetry = null;
1✔
174
      } else {
175
        retryFuture = null;
1✔
176
      }
177
      // cancel the scheduled hedging if it is scheduled prior to the commitment
178
      final Future<?> hedgingFuture;
179
      if (scheduledHedging != null) {
1✔
180
        // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
181
        // found: 'this.lock'
182
        hedgingFuture = scheduledHedging.markCancelled();
1✔
183
        scheduledHedging = null;
1✔
184
      } else {
185
        hedgingFuture = null;
1✔
186
      }
187

188
      class CommitTask implements Runnable {
1✔
189
        @Override
190
        public void run() {
191
          // For hedging only, not needed for normal retry
192
          for (Substream substream : savedDrainedSubstreams) {
1✔
193
            if (substream != winningSubstream) {
1✔
194
              substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
1✔
195
            }
196
          }
1✔
197
          if (retryFuture != null) {
1✔
198
            retryFuture.cancel(false);
1✔
199
          }
200
          if (hedgingFuture != null) {
1✔
201
            hedgingFuture.cancel(false);
1✔
202
          }
203

204
          postCommit();
1✔
205
        }
1✔
206
      }
207

208
      return new CommitTask();
1✔
209
    }
210
  }
211

212
  abstract void postCommit();
213

214
  /**
215
   * Calls commit() and if successful runs the post commit task.
216
   */
217
  private void commitAndRun(Substream winningSubstream) {
218
    Runnable postCommitTask = commit(winningSubstream);
1✔
219

220
    if (postCommitTask != null) {
1✔
221
      postCommitTask.run();
1✔
222
    }
223
  }
1✔
224

225
  // returns null means we should not create new sub streams, e.g. cancelled or
226
  // other close condition is met for retriableStream.
227
  @Nullable
228
  private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
229
    int inFlight;
230
    do {
231
      inFlight = inFlightSubStreams.get();
1✔
232
      if (inFlight < 0) {
1✔
233
        return null;
×
234
      }
235
    } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1));
1✔
236
    Substream sub = new Substream(previousAttemptCount);
1✔
237
    // one tracer per substream
238
    final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
1✔
239
    ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
1✔
240
      @Override
241
      public ClientStreamTracer newClientStreamTracer(
242
          ClientStreamTracer.StreamInfo info, Metadata headers) {
243
        return bufferSizeTracer;
1✔
244
      }
245
    };
246

247
    Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
1✔
248
    // NOTICE: This set _must_ be done before stream.start() and it actually is.
249
    sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry);
1✔
250
    return sub;
1✔
251
  }
252

253
  /**
254
   * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
255
   * Client stream is not yet started.
256
   */
257
  abstract ClientStream newSubstream(
258
      Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
259
      boolean isTransparentRetry);
260

261
  /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
262
  @VisibleForTesting
263
  final Metadata updateHeaders(
264
      Metadata originalHeaders, int previousAttemptCount) {
265
    Metadata newHeaders = new Metadata();
1✔
266
    newHeaders.merge(originalHeaders);
1✔
267
    if (previousAttemptCount > 0) {
1✔
268
      newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount));
1✔
269
    }
270
    return newHeaders;
1✔
271
  }
272

273
  private void drain(Substream substream) {
274
    int index = 0;
1✔
275
    int chunk = 0x80;
1✔
276
    List<BufferEntry> list = null;
1✔
277
    boolean streamStarted = false;
1✔
278
    Runnable onReadyRunnable = null;
1✔
279

280
    while (true) {
281
      State savedState;
282

283
      synchronized (lock) {
1✔
284
        savedState = state;
1✔
285
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
286
          // committed but not me, to be cancelled
287
          break;
1✔
288
        }
289
        if (savedState.cancelled) {
1✔
290
          break;
1✔
291
        }
292
        if (index == savedState.buffer.size()) { // I'm drained
1✔
293
          state = savedState.substreamDrained(substream);
1✔
294
          if (!isReady()) {
1✔
295
            return;
1✔
296
          }
297
          onReadyRunnable = new Runnable() {
1✔
298
            @Override
299
            public void run() {
300
              if (!isClosed) {
1✔
301
                masterListener.onReady();
1✔
302
              }
303
            }
1✔
304
          };
305
          break;
1✔
306
        }
307

308
        if (substream.closed) {
1✔
309
          return;
×
310
        }
311

312
        int stop = Math.min(index + chunk, savedState.buffer.size());
1✔
313
        if (list == null) {
1✔
314
          list = new ArrayList<>(savedState.buffer.subList(index, stop));
1✔
315
        } else {
316
          list.clear();
1✔
317
          list.addAll(savedState.buffer.subList(index, stop));
1✔
318
        }
319
        index = stop;
1✔
320
      }
1✔
321

322
      for (BufferEntry bufferEntry : list) {
1✔
323
        bufferEntry.runWith(substream);
1✔
324
        if (bufferEntry instanceof RetriableStream.StartEntry) {
1✔
325
          streamStarted = true;
1✔
326
        }
327
        savedState = state;
1✔
328
        if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
1✔
329
          // committed but not me, to be cancelled
330
          break;
1✔
331
        }
332
        if (savedState.cancelled) {
1✔
333
          break;
1✔
334
        }
335
      }
1✔
336
    }
1✔
337

338
    if (onReadyRunnable != null) {
1✔
339
      listenerSerializeExecutor.execute(onReadyRunnable);
1✔
340
      return;
1✔
341
    }
342

343
    if (!streamStarted) {
1✔
344
      // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
345
      substream.stream.start(new Sublistener(substream));
1✔
346
    }
347
    substream.stream.cancel(
1✔
348
        state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
1✔
349
  }
1✔
350

351
  /**
352
   * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
353
   */
354
  @CheckReturnValue
355
  @Nullable
356
  abstract Status prestart();
357

358
  class StartEntry implements BufferEntry {
1✔
359
    @Override
360
    public void runWith(Substream substream) {
361
      substream.stream.start(new Sublistener(substream));
1✔
362
    }
1✔
363
  }
364

365
  /** Starts the first PRC attempt. */
366
  @Override
367
  public final void start(ClientStreamListener listener) {
368
    masterListener = listener;
1✔
369

370
    Status shutdownStatus = prestart();
1✔
371

372
    if (shutdownStatus != null) {
1✔
373
      cancel(shutdownStatus);
1✔
374
      return;
1✔
375
    }
376

377
    synchronized (lock) {
1✔
378
      state.buffer.add(new StartEntry());
1✔
379
    }
1✔
380

381
    Substream substream = createSubstream(0, false);
1✔
382
    if (substream == null) {
1✔
383
      return;
×
384
    }
385
    if (isHedging) {
1✔
386
      FutureCanceller scheduledHedgingRef = null;
1✔
387

388
      synchronized (lock) {
1✔
389
        state = state.addActiveHedge(substream);
1✔
390
        if (hasPotentialHedging(state)
1✔
391
            && (throttle == null || throttle.isAboveThreshold())) {
1✔
392
          scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
1✔
393
        }
394
      }
1✔
395

396
      if (scheduledHedgingRef != null) {
1✔
397
        scheduledHedgingRef.setFuture(
1✔
398
            scheduledExecutorService.schedule(
1✔
399
                new HedgingRunnable(scheduledHedgingRef),
400
                hedgingPolicy.hedgingDelayNanos,
401
                TimeUnit.NANOSECONDS));
402
      }
403
    }
404

405
    drain(substream);
1✔
406
  }
1✔
407

408
  @SuppressWarnings("GuardedBy")
409
  private void pushbackHedging(@Nullable Integer delayMillis) {
410
    if (delayMillis == null) {
1✔
411
      return;
1✔
412
    }
413
    if (delayMillis < 0) {
1✔
414
      freezeHedging();
1✔
415
      return;
1✔
416
    }
417

418
    // Cancels the current scheduledHedging and reschedules a new one.
419
    FutureCanceller future;
420
    Future<?> futureToBeCancelled;
421

422
    synchronized (lock) {
1✔
423
      if (scheduledHedging == null) {
1✔
424
        return;
×
425
      }
426

427
      // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
428
      // found: 'this.lock'
429
      futureToBeCancelled = scheduledHedging.markCancelled();
1✔
430
      scheduledHedging = future = new FutureCanceller(lock);
1✔
431
    }
1✔
432

433
    if (futureToBeCancelled != null) {
1✔
434
      futureToBeCancelled.cancel(false);
1✔
435
    }
436
    future.setFuture(scheduledExecutorService.schedule(
1✔
437
        new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS));
1✔
438
  }
1✔
439

440
  private final class HedgingRunnable implements Runnable {
441

442
    // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
443
    // by a positive push-back just after newSubstream is instantiated, so that we can double check.
444
    final FutureCanceller scheduledHedgingRef;
445

446
    HedgingRunnable(FutureCanceller scheduledHedging) {
1✔
447
      scheduledHedgingRef = scheduledHedging;
1✔
448
    }
1✔
449

450
    @Override
451
    public void run() {
452
      // It's safe to read state.hedgingAttemptCount here.
453
      // If this run is not cancelled, the value of state.hedgingAttemptCount won't change
454
      // until state.addActiveHedge() is called subsequently, even the state could possibly
455
      // change.
456
      Substream newSubstream = createSubstream(state.hedgingAttemptCount, false);
1✔
457
      if (newSubstream == null) {
1✔
458
        return;
×
459
      }
460
      callExecutor.execute(
1✔
461
          new Runnable() {
1✔
462
            @SuppressWarnings("GuardedBy")
463
            @Override
464
            public void run() {
465
              boolean cancelled = false;
1✔
466
              FutureCanceller future = null;
1✔
467

468
              synchronized (lock) {
1✔
469
                // TODO(b/145386688): This access should be guarded by
470
                // 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found:
471
                // 'RetriableStream.this.lock'
472
                if (scheduledHedgingRef.isCancelled()) {
1✔
473
                  cancelled = true;
×
474
                } else {
475
                  state = state.addActiveHedge(newSubstream);
1✔
476
                  if (hasPotentialHedging(state)
1✔
477
                      && (throttle == null || throttle.isAboveThreshold())) {
1✔
478
                    scheduledHedging = future = new FutureCanceller(lock);
1✔
479
                  } else {
480
                    state = state.freezeHedging();
1✔
481
                    scheduledHedging = null;
1✔
482
                  }
483
                }
484
              }
1✔
485

486
              if (cancelled) {
1✔
487
                // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
488
                newSubstream.stream.start(new Sublistener(newSubstream));
×
489
                newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
×
490
                return;
×
491
              }
492
              if (future != null) {
1✔
493
                future.setFuture(
1✔
494
                    scheduledExecutorService.schedule(
1✔
495
                        new HedgingRunnable(future),
496
                        hedgingPolicy.hedgingDelayNanos,
1✔
497
                        TimeUnit.NANOSECONDS));
498
              }
499
              drain(newSubstream);
1✔
500
            }
1✔
501
          });
502
    }
1✔
503
  }
504

505
  @Override
506
  public final void cancel(final Status reason) {
507
    Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
1✔
508
    noopSubstream.stream = new NoopClientStream();
1✔
509
    Runnable runnable = commit(noopSubstream);
1✔
510

511
    if (runnable != null) {
1✔
512
      synchronized (lock) {
1✔
513
        state = state.substreamDrained(noopSubstream);
1✔
514
      }
1✔
515
      runnable.run();
1✔
516
      safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
1✔
517
      return;
1✔
518
    }
519

520
    Substream winningSubstreamToCancel = null;
1✔
521
    synchronized (lock) {
1✔
522
      if (state.drainedSubstreams.contains(state.winningSubstream)) {
1✔
523
        winningSubstreamToCancel = state.winningSubstream;
1✔
524
      } else { // the winningSubstream will be cancelled while draining
525
        cancellationStatus = reason;
1✔
526
      }
527
      state = state.cancelled();
1✔
528
    }
1✔
529
    if (winningSubstreamToCancel != null) {
1✔
530
      winningSubstreamToCancel.stream.cancel(reason);
1✔
531
    }
532
  }
1✔
533

534
  private void delayOrExecute(BufferEntry bufferEntry) {
535
    Collection<Substream> savedDrainedSubstreams;
536
    synchronized (lock) {
1✔
537
      if (!state.passThrough) {
1✔
538
        state.buffer.add(bufferEntry);
1✔
539
      }
540
      savedDrainedSubstreams = state.drainedSubstreams;
1✔
541
    }
1✔
542

543
    for (Substream substream : savedDrainedSubstreams) {
1✔
544
      bufferEntry.runWith(substream);
1✔
545
    }
1✔
546
  }
1✔
547

548
  /**
549
   * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use
550
   * InputStream for buffering.
551
   */
552
  @Override
553
  public final void writeMessage(InputStream message) {
554
    throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
×
555
  }
556

557
  final void sendMessage(final ReqT message) {
558
    State savedState = state;
1✔
559
    if (savedState.passThrough) {
1✔
560
      savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
1✔
561
      return;
1✔
562
    }
563

564
    class SendMessageEntry implements BufferEntry {
1✔
565
      @Override
566
      public void runWith(Substream substream) {
567
        substream.stream.writeMessage(method.streamRequest(message));
1✔
568
        // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
569
        // flushes (or half close), but retry appears to have a code path that the flushes may
570
        // not happen. The code needs to be fixed and this removed. See #9340.
571
        substream.stream.flush();
1✔
572
      }
1✔
573
    }
574

575
    delayOrExecute(new SendMessageEntry());
1✔
576
  }
1✔
577

578
  @Override
579
  public final void request(final int numMessages) {
580
    State savedState = state;
1✔
581
    if (savedState.passThrough) {
1✔
582
      savedState.winningSubstream.stream.request(numMessages);
1✔
583
      return;
1✔
584
    }
585

586
    class RequestEntry implements BufferEntry {
1✔
587
      @Override
588
      public void runWith(Substream substream) {
589
        substream.stream.request(numMessages);
1✔
590
      }
1✔
591
    }
592

593
    delayOrExecute(new RequestEntry());
1✔
594
  }
1✔
595

596
  @Override
597
  public final void flush() {
598
    State savedState = state;
1✔
599
    if (savedState.passThrough) {
1✔
600
      savedState.winningSubstream.stream.flush();
1✔
601
      return;
1✔
602
    }
603

604
    class FlushEntry implements BufferEntry {
1✔
605
      @Override
606
      public void runWith(Substream substream) {
607
        substream.stream.flush();
1✔
608
      }
1✔
609
    }
610

611
    delayOrExecute(new FlushEntry());
1✔
612
  }
1✔
613

614
  @Override
615
  public final boolean isReady() {
616
    for (Substream substream : state.drainedSubstreams) {
1✔
617
      if (substream.stream.isReady()) {
1✔
618
        return true;
1✔
619
      }
620
    }
1✔
621
    return false;
1✔
622
  }
623

624
  @Override
625
  public void optimizeForDirectExecutor() {
626
    class OptimizeDirectEntry implements BufferEntry {
1✔
627
      @Override
628
      public void runWith(Substream substream) {
629
        substream.stream.optimizeForDirectExecutor();
1✔
630
      }
1✔
631
    }
632

633
    delayOrExecute(new OptimizeDirectEntry());
1✔
634
  }
1✔
635

636
  @Override
637
  public final void setCompressor(final Compressor compressor) {
638
    class CompressorEntry implements BufferEntry {
1✔
639
      @Override
640
      public void runWith(Substream substream) {
641
        substream.stream.setCompressor(compressor);
1✔
642
      }
1✔
643
    }
644

645
    delayOrExecute(new CompressorEntry());
1✔
646
  }
1✔
647

648
  @Override
649
  public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
650
    class FullStreamDecompressionEntry implements BufferEntry {
1✔
651
      @Override
652
      public void runWith(Substream substream) {
653
        substream.stream.setFullStreamDecompression(fullStreamDecompression);
1✔
654
      }
1✔
655
    }
656

657
    delayOrExecute(new FullStreamDecompressionEntry());
1✔
658
  }
1✔
659

660
  @Override
661
  public final void setMessageCompression(final boolean enable) {
662
    class MessageCompressionEntry implements BufferEntry {
1✔
663
      @Override
664
      public void runWith(Substream substream) {
665
        substream.stream.setMessageCompression(enable);
1✔
666
      }
1✔
667
    }
668

669
    delayOrExecute(new MessageCompressionEntry());
1✔
670
  }
1✔
671

672
  @Override
673
  public final void halfClose() {
674
    class HalfCloseEntry implements BufferEntry {
1✔
675
      @Override
676
      public void runWith(Substream substream) {
677
        substream.stream.halfClose();
1✔
678
      }
1✔
679
    }
680

681
    delayOrExecute(new HalfCloseEntry());
1✔
682
  }
1✔
683

684
  @Override
685
  public final void setAuthority(final String authority) {
686
    class AuthorityEntry implements BufferEntry {
1✔
687
      @Override
688
      public void runWith(Substream substream) {
689
        substream.stream.setAuthority(authority);
1✔
690
      }
1✔
691
    }
692

693
    delayOrExecute(new AuthorityEntry());
1✔
694
  }
1✔
695

696
  @Override
697
  public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
698
    class DecompressorRegistryEntry implements BufferEntry {
1✔
699
      @Override
700
      public void runWith(Substream substream) {
701
        substream.stream.setDecompressorRegistry(decompressorRegistry);
1✔
702
      }
1✔
703
    }
704

705
    delayOrExecute(new DecompressorRegistryEntry());
1✔
706
  }
1✔
707

708
  @Override
709
  public final void setMaxInboundMessageSize(final int maxSize) {
710
    class MaxInboundMessageSizeEntry implements BufferEntry {
1✔
711
      @Override
712
      public void runWith(Substream substream) {
713
        substream.stream.setMaxInboundMessageSize(maxSize);
1✔
714
      }
1✔
715
    }
716

717
    delayOrExecute(new MaxInboundMessageSizeEntry());
1✔
718
  }
1✔
719

720
  @Override
721
  public final void setMaxOutboundMessageSize(final int maxSize) {
722
    class MaxOutboundMessageSizeEntry implements BufferEntry {
1✔
723
      @Override
724
      public void runWith(Substream substream) {
725
        substream.stream.setMaxOutboundMessageSize(maxSize);
1✔
726
      }
1✔
727
    }
728

729
    delayOrExecute(new MaxOutboundMessageSizeEntry());
1✔
730
  }
1✔
731

732
  @Override
733
  public final void setDeadline(final Deadline deadline) {
734
    class DeadlineEntry implements BufferEntry {
1✔
735
      @Override
736
      public void runWith(Substream substream) {
737
        substream.stream.setDeadline(deadline);
1✔
738
      }
1✔
739
    }
740

741
    delayOrExecute(new DeadlineEntry());
1✔
742
  }
1✔
743

744
  @Override
745
  public final Attributes getAttributes() {
746
    if (state.winningSubstream != null) {
1✔
747
      return state.winningSubstream.stream.getAttributes();
1✔
748
    }
749
    return Attributes.EMPTY;
×
750
  }
751

752
  @Override
753
  public void appendTimeoutInsight(InsightBuilder insight) {
754
    State currentState;
755
    synchronized (lock) {
1✔
756
      insight.appendKeyValue("closed", closedSubstreamsInsight);
1✔
757
      currentState = state;
1✔
758
    }
1✔
759
    if (currentState.winningSubstream != null) {
1✔
760
      // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
761
      // of the winning substream, they may not have received closed() notifications yet, thus they
762
      // may be missing from closedSubstreamsInsight.  This may be a little confusing to the user.
763
      // We need to figure out how to include them.
764
      InsightBuilder substreamInsight = new InsightBuilder();
1✔
765
      currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
1✔
766
      insight.appendKeyValue("committed", substreamInsight);
1✔
767
    } else {
1✔
768
      InsightBuilder openSubstreamsInsight = new InsightBuilder();
1✔
769
      // drainedSubstreams doesn't include all open substreams.  Those which have just been created
770
      // and are still catching up with buffered requests (in other words, still draining) will not
771
      // show up.  We think this is benign, because the draining should be typically fast, and it'd
772
      // be indistinguishable from the case where those streams are to be created a little late due
773
      // to delays in the timer.
774
      for (Substream sub : currentState.drainedSubstreams) {
1✔
775
        InsightBuilder substreamInsight = new InsightBuilder();
1✔
776
        sub.stream.appendTimeoutInsight(substreamInsight);
1✔
777
        openSubstreamsInsight.append(substreamInsight);
1✔
778
      }
1✔
779
      insight.appendKeyValue("open", openSubstreamsInsight);
1✔
780
    }
781
  }
1✔
782

783
  private static Random random = new Random();
1✔
784

785
  @VisibleForTesting
786
  static void setRandom(Random random) {
787
    RetriableStream.random = random;
1✔
788
  }
1✔
789

790
  /**
791
   * Whether there is any potential hedge at the moment. A false return value implies there is
792
   * absolutely no potential hedge. At least one of the hedges will observe a false return value
793
   * when calling this method, unless otherwise the rpc is committed.
794
   */
795
  // only called when isHedging is true
796
  @GuardedBy("lock")
797
  private boolean hasPotentialHedging(State state) {
798
    return state.winningSubstream == null
1✔
799
        && state.hedgingAttemptCount < hedgingPolicy.maxAttempts
800
        && !state.hedgingFrozen;
801
  }
802

803
  @SuppressWarnings("GuardedBy")
804
  private void freezeHedging() {
805
    Future<?> futureToBeCancelled = null;
1✔
806
    synchronized (lock) {
1✔
807
      if (scheduledHedging != null) {
1✔
808
        // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
809
        // found: 'this.lock'
810
        futureToBeCancelled = scheduledHedging.markCancelled();
1✔
811
        scheduledHedging = null;
1✔
812
      }
813
      state = state.freezeHedging();
1✔
814
    }
1✔
815

816
    if (futureToBeCancelled != null) {
1✔
817
      futureToBeCancelled.cancel(false);
1✔
818
    }
819
  }
1✔
820

821
  private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
822
    savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
1✔
823
        metadata);
824
    if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
1✔
825
      listenerSerializeExecutor.execute(
1✔
826
          new Runnable() {
1✔
827
            @Override
828
            public void run() {
829
              isClosed = true;
1✔
830
              masterListener.closed(status, progress, metadata);
1✔
831
            }
1✔
832
          });
833
    }
834
  }
1✔
835

836
  private static final class SavedCloseMasterListenerReason {
837
    private final Status status;
838
    private final RpcProgress progress;
839
    private final Metadata metadata;
840

841
    SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
1✔
842
      this.status = status;
1✔
843
      this.progress = progress;
1✔
844
      this.metadata = metadata;
1✔
845
    }
1✔
846
  }
847

848
  private interface BufferEntry {
849
    /** Replays the buffer entry with the given stream. */
850
    void runWith(Substream substream);
851
  }
852

853
  private final class Sublistener implements ClientStreamListener {
1✔
854
    final Substream substream;
855

856
    Sublistener(Substream substream) {
1✔
857
      this.substream = substream;
1✔
858
    }
1✔
859

860
    @Override
861
    public void headersRead(final Metadata headers) {
862
      if (substream.previousAttemptCount > 0) {
1✔
863
        headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
1✔
864
        headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
1✔
865
      }
866
      commitAndRun(substream);
1✔
867
      if (state.winningSubstream == substream) {
1✔
868
        if (throttle != null) {
1✔
869
          throttle.onSuccess();
1✔
870
        }
871
        listenerSerializeExecutor.execute(
1✔
872
            new Runnable() {
1✔
873
              @Override
874
              public void run() {
875
                masterListener.headersRead(headers);
1✔
876
              }
1✔
877
            });
878
      }
879
    }
1✔
880

881
    @Override
882
    public void closed(
883
        final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
884
      synchronized (lock) {
1✔
885
        state = state.substreamClosed(substream);
1✔
886
        closedSubstreamsInsight.append(status.getCode());
1✔
887
      }
1✔
888

889
      if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
1✔
890
        assert savedCloseMasterListenerReason != null;
1✔
891
        listenerSerializeExecutor.execute(
1✔
892
            new Runnable() {
1✔
893
              @Override
894
              public void run() {
895
                isClosed = true;
1✔
896
                masterListener.closed(savedCloseMasterListenerReason.status,
1✔
897
                    savedCloseMasterListenerReason.progress,
1✔
898
                    savedCloseMasterListenerReason.metadata);
1✔
899
              }
1✔
900
            });
901
        return;
1✔
902
      }
903

904
      // handle a race between buffer limit exceeded and closed, when setting
905
      // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
906
      if (substream.bufferLimitExceeded) {
1✔
907
        commitAndRun(substream);
1✔
908
        if (state.winningSubstream == substream) {
1✔
909
          safeCloseMasterListener(status, rpcProgress, trailers);
1✔
910
        }
911
        return;
1✔
912
      }
913
      if (rpcProgress == RpcProgress.MISCARRIED
1✔
914
          && localOnlyTransparentRetries.incrementAndGet() > 1_000) {
1✔
915
        commitAndRun(substream);
1✔
916
        if (state.winningSubstream == substream) {
1✔
917
          Status tooManyTransparentRetries = Status.INTERNAL
1✔
918
              .withDescription("Too many transparent retries. Might be a bug in gRPC")
1✔
919
              .withCause(status.asRuntimeException());
1✔
920
          safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
1✔
921
        }
922
        return;
1✔
923
      }
924

925
      if (state.winningSubstream == null) {
1✔
926
        if (rpcProgress == RpcProgress.MISCARRIED
1✔
927
            || (rpcProgress == RpcProgress.REFUSED
928
                && noMoreTransparentRetry.compareAndSet(false, true))) {
1✔
929
          // transparent retry
930
          final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
1✔
931
          if (newSubstream == null) {
1✔
932
            return;
×
933
          }
934
          if (isHedging) {
1✔
935
            synchronized (lock) {
1✔
936
              // Although this operation is not done atomically with
937
              // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of
938
              // activeHedges, so neither does it affect the commitment decision of other threads,
939
              // nor do the commitment decision making threads affect itself.
940
              state = state.replaceActiveHedge(substream, newSubstream);
1✔
941
            }
1✔
942
          }
943

944
          callExecutor.execute(new Runnable() {
1✔
945
            @Override
946
            public void run() {
947
              drain(newSubstream);
1✔
948
            }
1✔
949
          });
950
          return;
1✔
951
        } else if (rpcProgress == RpcProgress.DROPPED) {
1✔
952
          // For normal retry, nothing need be done here, will just commit.
953
          // For hedging, cancel scheduled hedge that is scheduled prior to the drop
954
          if (isHedging) {
1✔
955
            freezeHedging();
×
956
          }
957
        } else {
958
          noMoreTransparentRetry.set(true);
1✔
959

960
          if (isHedging) {
1✔
961
            HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
1✔
962
            if (hedgingPlan.isHedgeable) {
1✔
963
              pushbackHedging(hedgingPlan.hedgingPushbackMillis);
1✔
964
            }
965
            synchronized (lock) {
1✔
966
              state = state.removeActiveHedge(substream);
1✔
967
              // The invariant is whether or not #(Potential Hedge + active hedges) > 0.
968
              // Once hasPotentialHedging(state) is false, it will always be false, and then
969
              // #(state.activeHedges) will be decreasing. This guarantees that even there may be
970
              // multiple concurrent hedges, one of the hedges will end up committed.
971
              if (hedgingPlan.isHedgeable) {
1✔
972
                if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
1✔
973
                  return;
1✔
974
                }
975
                // else, no activeHedges, no new hedges possible, try to commit
976
              } // else, isHedgeable is false, try to commit
977
            }
1✔
978
          } else {
1✔
979
            RetryPlan retryPlan = makeRetryDecision(status, trailers);
1✔
980
            if (retryPlan.shouldRetry) {
1✔
981
              // retry
982
              Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false);
1✔
983
              if (newSubstream == null) {
1✔
984
                return;
×
985
              }
986
              // The check state.winningSubstream == null, checking if is not already committed, is
987
              // racy, but is still safe b/c the retry will also handle committed/cancellation
988
              FutureCanceller scheduledRetryCopy;
989
              synchronized (lock) {
1✔
990
                scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
1✔
991
              }
1✔
992
              class RetryBackoffRunnable implements Runnable {
1✔
993
                @Override
994
                public void run() {
995
                  callExecutor.execute(
1✔
996
                      new Runnable() {
1✔
997
                        @Override
998
                        public void run() {
999
                          drain(newSubstream);
1✔
1000
                        }
1✔
1001
                      });
1002
                }
1✔
1003
              }
1004

1005
              scheduledRetryCopy.setFuture(
1✔
1006
                  scheduledExecutorService.schedule(
1✔
1007
                      new RetryBackoffRunnable(),
1008
                      retryPlan.backoffNanos,
1009
                      TimeUnit.NANOSECONDS));
1010
              return;
1✔
1011
            }
1012
          }
1013
        }
1014
      }
1015

1016
      commitAndRun(substream);
1✔
1017
      if (state.winningSubstream == substream) {
1✔
1018
        safeCloseMasterListener(status, rpcProgress, trailers);
1✔
1019
      }
1020
    }
1✔
1021

1022
    /**
1023
     * Decides in current situation whether or not the RPC should retry and if it should retry how
1024
     * long the backoff should be. The decision does not take the commitment status into account, so
1025
     * caller should check it separately. It also updates the throttle. It does not change state.
1026
     */
1027
    private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
1028
      if (retryPolicy == null) {
1✔
1029
        return new RetryPlan(false, 0);
1✔
1030
      }
1031
      boolean shouldRetry = false;
1✔
1032
      long backoffNanos = 0L;
1✔
1033
      boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
1✔
1034
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1035
      boolean isThrottled = false;
1✔
1036
      if (throttle != null) {
1✔
1037
        if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1038
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1039
        }
1040
      }
1041

1042
      if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
1✔
1043
        if (pushbackMillis == null) {
1✔
1044
          if (isRetryableStatusCode) {
1✔
1045
            shouldRetry = true;
1✔
1046
            backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble());
1✔
1047
            nextBackoffIntervalNanos = Math.min(
1✔
1048
                (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
1✔
1049
                retryPolicy.maxBackoffNanos);
1✔
1050
          } // else no retry
1051
        } else if (pushbackMillis >= 0) {
1✔
1052
          shouldRetry = true;
1✔
1053
          backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
1✔
1054
          nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1✔
1055
        } // else no retry
1056
      } // else no retry
1057

1058
      return new RetryPlan(shouldRetry, backoffNanos);
1✔
1059
    }
1060

1061
    private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
1062
      Integer pushbackMillis = getPushbackMills(trailer);
1✔
1063
      boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
1✔
1064
      boolean isThrottled = false;
1✔
1065
      if (throttle != null) {
1✔
1066
        if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
1✔
1067
          isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1✔
1068
        }
1069
      }
1070
      if (!isFatal && !isThrottled && !status.isOk()
1✔
1071
          && (pushbackMillis != null && pushbackMillis > 0)) {
1✔
1072
        pushbackMillis = 0; // We want the retry after a nonfatal error to be immediate
1✔
1073
      }
1074
      return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
1✔
1075
    }
1076

1077
    @Nullable
1078
    private Integer getPushbackMills(Metadata trailer) {
1079
      String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
1✔
1080
      Integer pushbackMillis = null;
1✔
1081
      if (pushbackStr != null) {
1✔
1082
        try {
1083
          pushbackMillis = Integer.valueOf(pushbackStr);
1✔
1084
        } catch (NumberFormatException e) {
1✔
1085
          pushbackMillis = -1;
1✔
1086
        }
1✔
1087
      }
1088
      return pushbackMillis;
1✔
1089
    }
1090

1091
    @Override
1092
    public void messagesAvailable(final MessageProducer producer) {
1093
      State savedState = state;
1✔
1094
      checkState(
1✔
1095
          savedState.winningSubstream != null, "Headers should be received prior to messages.");
1096
      if (savedState.winningSubstream != substream) {
1✔
1097
        GrpcUtil.closeQuietly(producer);
1✔
1098
        return;
1✔
1099
      }
1100
      listenerSerializeExecutor.execute(
1✔
1101
          new Runnable() {
1✔
1102
            @Override
1103
            public void run() {
1104
              masterListener.messagesAvailable(producer);
1✔
1105
            }
1✔
1106
          });
1107
    }
1✔
1108

1109
    @Override
1110
    public void onReady() {
1111
      // FIXME(#7089): hedging case is broken.
1112
      if (!isReady()) {
1✔
1113
        return;
1✔
1114
      }
1115
      listenerSerializeExecutor.execute(
1✔
1116
          new Runnable() {
1✔
1117
            @Override
1118
            public void run() {
1119
              if (!isClosed) {
1✔
1120
                masterListener.onReady();
1✔
1121
              }
1122
            }
1✔
1123
          });
1124
    }
1✔
1125
  }
1126

1127
  private static final class State {
1128
    /** Committed and the winning substream drained. */
1129
    final boolean passThrough;
1130

1131
    /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
1132
    @Nullable final List<BufferEntry> buffer;
1133

1134
    /**
1135
     * Unmodifiable collection of all the open substreams that are drained. Singleton once
1136
     * passThrough; Empty if committed but not passTrough.
1137
     */
1138
    final Collection<Substream> drainedSubstreams;
1139

1140
    /**
1141
     * Unmodifiable collection of all the active hedging substreams.
1142
     *
1143
     * <p>A substream even with the attribute substream.closed being true may be considered still
1144
     * "active" at the moment as long as it is in this collection.
1145
     */
1146
    final Collection<Substream> activeHedges; // not null once isHedging = true
1147

1148
    final int hedgingAttemptCount;
1149

1150
    /** Null until committed. */
1151
    @Nullable final Substream winningSubstream;
1152

1153
    /** Not required to set to true when cancelled, but can short-circuit the draining process. */
1154
    final boolean cancelled;
1155

1156
    /** No more hedging due to events like drop or pushback. */
1157
    final boolean hedgingFrozen;
1158

1159
    State(
1160
        @Nullable List<BufferEntry> buffer,
1161
        Collection<Substream> drainedSubstreams,
1162
        Collection<Substream> activeHedges,
1163
        @Nullable Substream winningSubstream,
1164
        boolean cancelled,
1165
        boolean passThrough,
1166
        boolean hedgingFrozen,
1167
        int hedgingAttemptCount) {
1✔
1168
      this.buffer = buffer;
1✔
1169
      this.drainedSubstreams =
1✔
1170
          checkNotNull(drainedSubstreams, "drainedSubstreams");
1✔
1171
      this.winningSubstream = winningSubstream;
1✔
1172
      this.activeHedges = activeHedges;
1✔
1173
      this.cancelled = cancelled;
1✔
1174
      this.passThrough = passThrough;
1✔
1175
      this.hedgingFrozen = hedgingFrozen;
1✔
1176
      this.hedgingAttemptCount = hedgingAttemptCount;
1✔
1177

1178
      checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
1✔
1179
      checkState(
1✔
1180
          !passThrough || winningSubstream != null,
1181
          "passThrough should imply winningSubstream != null");
1182
      checkState(
1✔
1183
          !passThrough
1184
              || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
1✔
1185
              || (drainedSubstreams.size() == 0 && winningSubstream.closed),
1✔
1186
          "passThrough should imply winningSubstream is drained");
1187
      checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
1✔
1188
    }
1✔
1189

1190
    @CheckReturnValue
1191
    // GuardedBy RetriableStream.lock
1192
    State cancelled() {
1193
      return new State(
1✔
1194
          buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough,
1195
          hedgingFrozen, hedgingAttemptCount);
1196
    }
1197

1198
    /** The given substream is drained. */
1199
    @CheckReturnValue
1200
    // GuardedBy RetriableStream.lock
1201
    State substreamDrained(Substream substream) {
1202
      checkState(!passThrough, "Already passThrough");
1✔
1203

1204
      Collection<Substream> drainedSubstreams;
1205
      
1206
      if (substream.closed) {
1✔
1207
        drainedSubstreams = this.drainedSubstreams;
1✔
1208
      } else if (this.drainedSubstreams.isEmpty()) {
1✔
1209
        // optimize for 0-retry, which is most of the cases.
1210
        drainedSubstreams = Collections.singletonList(substream);
1✔
1211
      } else {
1212
        drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1213
        drainedSubstreams.add(substream);
1✔
1214
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1215
      }
1216

1217
      boolean passThrough = winningSubstream != null;
1✔
1218

1219
      List<BufferEntry> buffer = this.buffer;
1✔
1220
      if (passThrough) {
1✔
1221
        checkState(
1✔
1222
            winningSubstream == substream, "Another RPC attempt has already committed");
1223
        buffer = null;
1✔
1224
      }
1225

1226
      return new State(
1✔
1227
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1228
          hedgingFrozen, hedgingAttemptCount);
1229
    }
1230

1231
    /** The given substream is closed. */
1232
    @CheckReturnValue
1233
    // GuardedBy RetriableStream.lock
1234
    State substreamClosed(Substream substream) {
1235
      substream.closed = true;
1✔
1236
      if (this.drainedSubstreams.contains(substream)) {
1✔
1237
        Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1✔
1238
        drainedSubstreams.remove(substream);
1✔
1239
        drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1✔
1240
        return new State(
1✔
1241
            buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1242
            hedgingFrozen, hedgingAttemptCount);
1243
      } else {
1244
        return this;
1✔
1245
      }
1246
    }
1247

1248
    @CheckReturnValue
1249
    // GuardedBy RetriableStream.lock
1250
    State committed(Substream winningSubstream) {
1251
      checkState(this.winningSubstream == null, "Already committed");
1✔
1252

1253
      boolean passThrough = false;
1✔
1254
      List<BufferEntry> buffer = this.buffer;
1✔
1255
      Collection<Substream> drainedSubstreams;
1256

1257
      if (this.drainedSubstreams.contains(winningSubstream)) {
1✔
1258
        passThrough = true;
1✔
1259
        buffer = null;
1✔
1260
        drainedSubstreams = Collections.singleton(winningSubstream);
1✔
1261
      } else {
1262
        drainedSubstreams = Collections.emptyList();
1✔
1263
      }
1264

1265
      return new State(
1✔
1266
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1267
          hedgingFrozen, hedgingAttemptCount);
1268
    }
1269

1270
    @CheckReturnValue
1271
    // GuardedBy RetriableStream.lock
1272
    State freezeHedging() {
1273
      if (hedgingFrozen) {
1✔
1274
        return this;
×
1275
      }
1276
      return new State(
1✔
1277
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1278
          true, hedgingAttemptCount);
1279
    }
1280

1281
    @CheckReturnValue
1282
    // GuardedBy RetriableStream.lock
1283
    // state.hedgingAttemptCount is modified only here.
1284
    // The method is only called in RetriableStream.start() and HedgingRunnable.run()
1285
    State addActiveHedge(Substream substream) {
1286
      // hasPotentialHedging must be true
1287
      checkState(!hedgingFrozen, "hedging frozen");
1✔
1288
      checkState(winningSubstream == null, "already committed");
1✔
1289

1290
      Collection<Substream> activeHedges;
1291
      if (this.activeHedges == null) {
1✔
1292
        activeHedges = Collections.singleton(substream);
1✔
1293
      } else {
1294
        activeHedges = new ArrayList<>(this.activeHedges);
1✔
1295
        activeHedges.add(substream);
1✔
1296
        activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1297
      }
1298

1299
      int hedgingAttemptCount = this.hedgingAttemptCount + 1;
1✔
1300
      return new State(
1✔
1301
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1302
          hedgingFrozen, hedgingAttemptCount);
1303
    }
1304

1305
    @CheckReturnValue
1306
    // GuardedBy RetriableStream.lock
1307
    // The method is only called in Sublistener.closed()
1308
    State removeActiveHedge(Substream substream) {
1309
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1310
      activeHedges.remove(substream);
1✔
1311
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1312

1313
      return new State(
1✔
1314
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1315
          hedgingFrozen, hedgingAttemptCount);
1316
    }
1317

1318
    @CheckReturnValue
1319
    // GuardedBy RetriableStream.lock
1320
    // The method is only called for transparent retry.
1321
    State replaceActiveHedge(Substream oldOne, Substream newOne) {
1322
      Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1✔
1323
      activeHedges.remove(oldOne);
1✔
1324
      activeHedges.add(newOne);
1✔
1325
      activeHedges = Collections.unmodifiableCollection(activeHedges);
1✔
1326

1327
      return new State(
1✔
1328
          buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1329
          hedgingFrozen, hedgingAttemptCount);
1330
    }
1331
  }
1332

1333
  /**
1334
   * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
1335
   *  attributes.
1336
   */
1337
  private static final class Substream {
1338
    ClientStream stream;
1339

1340
    // GuardedBy RetriableStream.lock
1341
    boolean closed;
1342

1343
    // setting to true must be GuardedBy RetriableStream.lock
1344
    boolean bufferLimitExceeded;
1345

1346
    final int previousAttemptCount;
1347

1348
    Substream(int previousAttemptCount) {
1✔
1349
      this.previousAttemptCount = previousAttemptCount;
1✔
1350
    }
1✔
1351
  }
1352

1353

1354
  /**
1355
   * Traces the buffer used by a substream.
1356
   */
1357
  class BufferSizeTracer extends ClientStreamTracer {
1358
    // Each buffer size tracer is dedicated to one specific substream.
1359
    private final Substream substream;
1360

1361
    @GuardedBy("lock")
1362
    long bufferNeeded;
1363

1364
    BufferSizeTracer(Substream substream) {
1✔
1365
      this.substream = substream;
1✔
1366
    }
1✔
1367

1368
    /**
1369
     * A message is sent to the wire, so its reference would be released if no retry or
1370
     * hedging were involved. So at this point we have to hold the reference of the message longer
1371
     * for retry, and we need to increment {@code substream.bufferNeeded}.
1372
     */
1373
    @Override
1374
    public void outboundWireSize(long bytes) {
1375
      if (state.winningSubstream != null) {
1✔
1376
        return;
1✔
1377
      }
1378

1379
      Runnable postCommitTask = null;
1✔
1380

1381
      // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
1382
      synchronized (lock) {
1✔
1383
        if (state.winningSubstream != null || substream.closed) {
1✔
1384
          return;
×
1385
        }
1386
        bufferNeeded += bytes;
1✔
1387
        if (bufferNeeded <= perRpcBufferUsed) {
1✔
1388
          return;
1✔
1389
        }
1390

1391
        if (bufferNeeded > perRpcBufferLimit) {
1✔
1392
          substream.bufferLimitExceeded = true;
1✔
1393
        } else {
1394
          // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
1395
          long savedChannelBufferUsed =
1✔
1396
              channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
1✔
1397
          perRpcBufferUsed = bufferNeeded;
1✔
1398

1399
          if (savedChannelBufferUsed > channelBufferLimit) {
1✔
1400
            substream.bufferLimitExceeded = true;
1✔
1401
          }
1402
        }
1403

1404
        if (substream.bufferLimitExceeded) {
1✔
1405
          postCommitTask = commit(substream);
1✔
1406
        }
1407
      }
1✔
1408

1409
      if (postCommitTask != null) {
1✔
1410
        postCommitTask.run();
1✔
1411
      }
1412
    }
1✔
1413
  }
1414

1415
  /**
1416
   *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
1417
   *  the Channel. There should be a single instance of it for each channel.
1418
   */
1419
  static final class ChannelBufferMeter {
1✔
1420
    private final AtomicLong bufferUsed = new AtomicLong();
1✔
1421

1422
    @VisibleForTesting
1423
    long addAndGet(long newBytesUsed) {
1424
      return bufferUsed.addAndGet(newBytesUsed);
1✔
1425
    }
1426
  }
1427

1428
  /**
1429
   * Used for retry throttling.
1430
   */
1431
  static final class Throttle {
1432

1433
    private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
1434

1435
    /**
1436
     * 1000 times the maxTokens field of the retryThrottling policy in service config.
1437
     * The number of tokens starts at maxTokens. The token_count will always be between 0 and
1438
     * maxTokens.
1439
     */
1440
    final int maxTokens;
1441

1442
    /**
1443
     * Half of {@code maxTokens}.
1444
     */
1445
    final int threshold;
1446

1447
    /**
1448
     * 1000 times the tokenRatio field of the retryThrottling policy in service config.
1449
     */
1450
    final int tokenRatio;
1451

1452
    final AtomicInteger tokenCount = new AtomicInteger();
1✔
1453

1454
    Throttle(float maxTokens, float tokenRatio) {
1✔
1455
      // tokenRatio is up to 3 decimal places
1456
      this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1457
      this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1458
      this.threshold = this.maxTokens / 2;
1✔
1459
      tokenCount.set(this.maxTokens);
1✔
1460
    }
1✔
1461

1462
    @VisibleForTesting
1463
    boolean isAboveThreshold() {
1464
      return tokenCount.get() > threshold;
1✔
1465
    }
1466

1467
    /**
1468
     * Counts down the token on qualified failure and checks if it is above the threshold
1469
     * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
1470
     * a not-to-retry pushback.
1471
     */
1472
    @VisibleForTesting
1473
    boolean onQualifiedFailureThenCheckIsAboveThreshold() {
1474
      while (true) {
1475
        int currentCount = tokenCount.get();
1✔
1476
        if (currentCount == 0) {
1✔
1477
          return false;
1✔
1478
        }
1479
        int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
1✔
1480
        boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
1✔
1481
        if (updated) {
1✔
1482
          return decremented > threshold;
1✔
1483
        }
1484
      }
×
1485
    }
1486

1487
    @VisibleForTesting
1488
    void onSuccess() {
1489
      while (true) {
1490
        int currentCount = tokenCount.get();
1✔
1491
        if (currentCount == maxTokens) {
1✔
1492
          break;
1✔
1493
        }
1494
        int incremented = currentCount + tokenRatio;
1✔
1495
        boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
1✔
1496
        if (updated) {
1✔
1497
          break;
1✔
1498
        }
1499
      }
×
1500
    }
1✔
1501

1502
    @Override
1503
    public boolean equals(Object o) {
1504
      if (this == o) {
1✔
1505
        return true;
×
1506
      }
1507
      if (!(o instanceof Throttle)) {
1✔
1508
        return false;
×
1509
      }
1510
      Throttle that = (Throttle) o;
1✔
1511
      return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
1✔
1512
    }
1513

1514
    @Override
1515
    public int hashCode() {
1516
      return Objects.hashCode(maxTokens, tokenRatio);
×
1517
    }
1518
  }
1519

1520
  private static final class RetryPlan {
1521
    final boolean shouldRetry;
1522
    final long backoffNanos;
1523

1524
    RetryPlan(boolean shouldRetry, long backoffNanos) {
1✔
1525
      this.shouldRetry = shouldRetry;
1✔
1526
      this.backoffNanos = backoffNanos;
1✔
1527
    }
1✔
1528
  }
1529

1530
  private static final class HedgingPlan {
1531
    final boolean isHedgeable;
1532
    @Nullable
1533
    final Integer hedgingPushbackMillis;
1534

1535
    public HedgingPlan(
1536
        boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
1✔
1537
      this.isHedgeable = isHedgeable;
1✔
1538
      this.hedgingPushbackMillis = hedgingPushbackMillis;
1✔
1539
    }
1✔
1540
  }
1541

1542
  /** Allows cancelling a Future without racing with setting the future. */
1543
  private static final class FutureCanceller {
1544

1545
    final Object lock;
1546
    @GuardedBy("lock")
1547
    Future<?> future;
1548
    @GuardedBy("lock")
1549
    boolean cancelled;
1550

1551
    FutureCanceller(Object lock) {
1✔
1552
      this.lock = lock;
1✔
1553
    }
1✔
1554

1555
    void setFuture(Future<?> future) {
1556
      synchronized (lock) {
1✔
1557
        if (!cancelled) {
1✔
1558
          this.future = future;
1✔
1559
        }
1560
      }
1✔
1561
    }
1✔
1562

1563
    @GuardedBy("lock")
1564
    @CheckForNull // Must cancel the returned future if not null.
1565
    Future<?> markCancelled() {
1566
      cancelled = true;
1✔
1567
      return future;
1✔
1568
    }
1569

1570
    @GuardedBy("lock")
1571
    boolean isCancelled() {
1572
      return cancelled;
1✔
1573
    }
1574
  }
1575
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc