• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18783

pending completion
#18783

push

github-actions

web-flow
util: Outlier detection tracer delegation (#10459) (#10483)

OutlierDetectionLoadBalancer did not delegate calls to an existing
ClientStreamTracer from the tracer it installed. This change has the OD
tracer delegate all calls to the underlying one.

30639 of 34707 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.24
/../core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.util;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static java.util.concurrent.TimeUnit.NANOSECONDS;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ForwardingMap;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableSet;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.ChannelLogger.ChannelLogLevel;
31
import io.grpc.ClientStreamTracer;
32
import io.grpc.ClientStreamTracer.StreamInfo;
33
import io.grpc.ConnectivityState;
34
import io.grpc.ConnectivityStateInfo;
35
import io.grpc.EquivalentAddressGroup;
36
import io.grpc.Internal;
37
import io.grpc.LoadBalancer;
38
import io.grpc.Metadata;
39
import io.grpc.Status;
40
import io.grpc.SynchronizationContext;
41
import io.grpc.SynchronizationContext.ScheduledHandle;
42
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
43
import io.grpc.internal.TimeProvider;
44
import java.net.SocketAddress;
45
import java.util.ArrayList;
46
import java.util.Collection;
47
import java.util.HashMap;
48
import java.util.HashSet;
49
import java.util.List;
50
import java.util.Map;
51
import java.util.Random;
52
import java.util.Set;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.atomic.AtomicLong;
55
import javax.annotation.Nullable;
56

57
/**
58
 * Wraps a child {@code LoadBalancer} while monitoring for outlier backends and removing them from
59
 * the use of the child LB.
60
 *
61
 * <p>This implements the outlier detection gRFC:
62
 * https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md
63
 */
64
@Internal
65
public final class OutlierDetectionLoadBalancer extends LoadBalancer {
66

67
  @VisibleForTesting
68
  final AddressTrackerMap trackerMap;
69

70
  private final SynchronizationContext syncContext;
71
  private final Helper childHelper;
72
  private final GracefulSwitchLoadBalancer switchLb;
73
  private TimeProvider timeProvider;
74
  private final ScheduledExecutorService timeService;
75
  private ScheduledHandle detectionTimerHandle;
76
  private Long detectionTimerStartNanos;
77

78
  private final ChannelLogger logger;
79

80
  private static final Attributes.Key<AddressTracker> ADDRESS_TRACKER_ATTR_KEY
1✔
81
      = Attributes.Key.create("addressTrackerKey");
1✔
82

83
  /**
84
   * Creates a new instance of {@link OutlierDetectionLoadBalancer}.
85
   */
86
  public OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider) {
1✔
87
    logger = helper.getChannelLogger();
1✔
88
    childHelper = new ChildHelper(checkNotNull(helper, "helper"));
1✔
89
    switchLb = new GracefulSwitchLoadBalancer(childHelper);
1✔
90
    trackerMap = new AddressTrackerMap();
1✔
91
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
92
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
93
    this.timeProvider = timeProvider;
1✔
94
    logger.log(ChannelLogLevel.DEBUG, "OutlierDetection lb created.");
1✔
95
  }
1✔
96

97
  @Override
98
  public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
99
    logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
100
    OutlierDetectionLoadBalancerConfig config
1✔
101
        = (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
102

103
    // The map should only retain entries for addresses in this latest update.
104
    ArrayList<SocketAddress> addresses = new ArrayList<>();
1✔
105
    for (EquivalentAddressGroup addressGroup : resolvedAddresses.getAddresses()) {
1✔
106
      addresses.addAll(addressGroup.getAddresses());
1✔
107
    }
1✔
108
    trackerMap.keySet().retainAll(addresses);
1✔
109

110
    trackerMap.updateTrackerConfigs(config);
1✔
111

112
    // Add any new ones.
113
    trackerMap.putNewTrackers(config, addresses);
1✔
114

115
    switchLb.switchTo(config.childPolicy.getProvider());
1✔
116

117
    // If outlier detection is actually configured, start a timer that will periodically try to
118
    // detect outliers.
119
    if (config.outlierDetectionEnabled()) {
1✔
120
      Long initialDelayNanos;
121

122
      if (detectionTimerStartNanos == null) {
1✔
123
        // On the first go we use the configured interval.
124
        initialDelayNanos = config.intervalNanos;
1✔
125
      } else {
126
        // If a timer has started earlier we cancel it and use the difference between the start
127
        // time and now as the interval.
128
        initialDelayNanos = Math.max(0L,
1✔
129
            config.intervalNanos - (timeProvider.currentTimeNanos() - detectionTimerStartNanos));
1✔
130
      }
131

132
      // If a timer has been previously created we need to cancel it and reset all the call counters
133
      // for a fresh start.
134
      if (detectionTimerHandle != null) {
1✔
135
        detectionTimerHandle.cancel();
1✔
136
        trackerMap.resetCallCounters();
1✔
137
      }
138

139
      detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config, logger),
1✔
140
          initialDelayNanos, config.intervalNanos, NANOSECONDS, timeService);
1✔
141
    } else if (detectionTimerHandle != null) {
1✔
142
      // Outlier detection is not configured, but we have a lingering timer. Let's cancel it and
143
      // uneject any addresses we may have ejected.
144
      detectionTimerHandle.cancel();
1✔
145
      detectionTimerStartNanos = null;
1✔
146
      trackerMap.cancelTracking();
1✔
147
    }
148

149
    switchLb.handleResolvedAddresses(
1✔
150
        resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
1✔
151
            .build());
1✔
152
    return true;
1✔
153
  }
154

155
  @Override
156
  public void handleNameResolutionError(Status error) {
157
    switchLb.handleNameResolutionError(error);
1✔
158
  }
1✔
159

160
  @Override
161
  public void shutdown() {
162
    switchLb.shutdown();
1✔
163
  }
1✔
164

165
  /**
166
   * This timer will be invoked periodically, according to configuration, and it will look for any
167
   * outlier subchannels.
168
   */
169
  class DetectionTimer implements Runnable {
170

171
    OutlierDetectionLoadBalancerConfig config;
172
    ChannelLogger logger;
173

174
    DetectionTimer(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger) {
1✔
175
      this.config = config;
1✔
176
      this.logger = logger;
1✔
177
    }
1✔
178

179
    @Override
180
    public void run() {
181
      detectionTimerStartNanos = timeProvider.currentTimeNanos();
1✔
182

183
      trackerMap.swapCounters();
1✔
184

185
      for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config, logger)) {
1✔
186
        algo.ejectOutliers(trackerMap, detectionTimerStartNanos);
1✔
187
      }
1✔
188

189
      trackerMap.maybeUnejectOutliers(detectionTimerStartNanos);
1✔
190
    }
1✔
191
  }
192

193
  /**
194
   * This child helper wraps the provided helper so that it can hand out wrapped {@link
195
   * OutlierDetectionSubchannel}s and manage the address info map.
196
   */
197
  class ChildHelper extends ForwardingLoadBalancerHelper {
198

199
    private Helper delegate;
200

201
    ChildHelper(Helper delegate) {
1✔
202
      this.delegate = delegate;
1✔
203
    }
1✔
204

205
    @Override
206
    protected Helper delegate() {
207
      return delegate;
1✔
208
    }
209

210
    @Override
211
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
212
      // Subchannels are wrapped so that we can monitor call results and to trigger failures when
213
      // we decide to eject the subchannel.
214
      OutlierDetectionSubchannel subchannel = new OutlierDetectionSubchannel(
1✔
215
          delegate.createSubchannel(args));
1✔
216

217
      // If the subchannel is associated with a single address that is also already in the map
218
      // the subchannel will be added to the map and be included in outlier detection.
219
      List<EquivalentAddressGroup> addressGroups = args.getAddresses();
1✔
220
      if (hasSingleAddress(addressGroups)
1✔
221
          && trackerMap.containsKey(addressGroups.get(0).getAddresses().get(0))) {
1✔
222
        AddressTracker tracker = trackerMap.get(addressGroups.get(0).getAddresses().get(0));
1✔
223
        tracker.addSubchannel(subchannel);
1✔
224

225
        // If this address has already been ejected, we need to immediately eject this Subchannel.
226
        if (tracker.ejectionTimeNanos != null) {
1✔
227
          subchannel.eject();
×
228
        }
229
      }
230

231
      return subchannel;
1✔
232
    }
233

234
    @Override
235
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
236
      delegate.updateBalancingState(newState, new OutlierDetectionPicker(newPicker));
1✔
237
    }
1✔
238
  }
239

240
  class OutlierDetectionSubchannel extends ForwardingSubchannel {
241

242
    private final Subchannel delegate;
243
    private AddressTracker addressTracker;
244
    private boolean ejected;
245
    private ConnectivityStateInfo lastSubchannelState;
246
    private SubchannelStateListener subchannelStateListener;
247
    private final ChannelLogger logger;
248

249
    OutlierDetectionSubchannel(Subchannel delegate) {
1✔
250
      this.delegate = delegate;
1✔
251
      this.logger = delegate.getChannelLogger();
1✔
252
    }
1✔
253

254
    @Override
255
    public void start(SubchannelStateListener listener) {
256
      subchannelStateListener = listener;
1✔
257
      super.start(new OutlierDetectionSubchannelStateListener(listener));
1✔
258
    }
1✔
259

260
    @Override
261
    public Attributes getAttributes() {
262
      if (addressTracker != null) {
1✔
263
        return delegate.getAttributes().toBuilder().set(ADDRESS_TRACKER_ATTR_KEY, addressTracker)
1✔
264
            .build();
1✔
265
      } else {
266
        return delegate.getAttributes();
1✔
267
      }
268
    }
269

270
    @Override
271
    public void updateAddresses(List<EquivalentAddressGroup> addressGroups) {
272
      // Outlier detection only supports subchannels with a single address, but the list of
273
      // addressGroups associated with a subchannel can change at any time, so we need to react to
274
      // changes in the address list plurality.
275

276
      // No change in address plurality, we replace the single one with a new one.
277
      if (hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) {
1✔
278
        // Remove the current subchannel from the old address it is associated with in the map.
279
        if (trackerMap.containsValue(addressTracker)) {
1✔
280
          addressTracker.removeSubchannel(this);
1✔
281
        }
282

283
        // If the map has an entry for the new address, we associate this subchannel with it.
284
        SocketAddress address = addressGroups.get(0).getAddresses().get(0);
1✔
285
        if (trackerMap.containsKey(address)) {
1✔
286
          trackerMap.get(address).addSubchannel(this);
1✔
287
        }
288
      } else if (hasSingleAddress(getAllAddresses()) && !hasSingleAddress(addressGroups)) {
1✔
289
        // We go from a single address to having multiple, making this subchannel uneligible for
290
        // outlier detection. Remove it from all trackers and reset the call counters of all the
291
        // associated trackers.
292
        // Remove the current subchannel from the old address it is associated with in the map.
293
        if (trackerMap.containsKey(getAddresses().getAddresses().get(0))) {
1✔
294
          AddressTracker tracker = trackerMap.get(getAddresses().getAddresses().get(0));
1✔
295
          tracker.removeSubchannel(this);
1✔
296
          tracker.resetCallCounters();
1✔
297
        }
1✔
298
      } else if (!hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) {
1✔
299
        // We go from, previously uneligble, multiple address mode to a single address. If the map
300
        // has an entry for the new address, we associate this subchannel with it.
301
        SocketAddress address = addressGroups.get(0).getAddresses().get(0);
1✔
302
        if (trackerMap.containsKey(address)) {
1✔
303
          AddressTracker tracker = trackerMap.get(address);
1✔
304
          tracker.addSubchannel(this);
1✔
305
        }
306
      }
307

308
      // We could also have multiple addressGroups and get an update for multiple new ones. This is
309
      // a no-op as we will just continue to ignore multiple address subchannels.
310

311
      delegate.updateAddresses(addressGroups);
1✔
312
    }
1✔
313

314
    /**
315
     * If the {@link Subchannel} is considered for outlier detection the associated {@link
316
     * AddressTracker} should be set.
317
     */
318
    void setAddressTracker(AddressTracker addressTracker) {
319
      this.addressTracker = addressTracker;
1✔
320
    }
1✔
321

322
    void clearAddressTracker() {
323
      this.addressTracker = null;
1✔
324
    }
1✔
325

326
    void eject() {
327
      ejected = true;
1✔
328
      subchannelStateListener.onSubchannelState(
1✔
329
          ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
1✔
330
      logger.log(ChannelLogLevel.INFO, "Subchannel ejected: {0}", this);
1✔
331
    }
1✔
332

333
    void uneject() {
334
      ejected = false;
1✔
335
      if (lastSubchannelState != null) {
1✔
336
        subchannelStateListener.onSubchannelState(lastSubchannelState);
1✔
337
        logger.log(ChannelLogLevel.INFO, "Subchannel unejected: {0}", this);
1✔
338
      }
339
    }
1✔
340

341
    boolean isEjected() {
342
      return ejected;
1✔
343
    }
344

345
    @Override
346
    protected Subchannel delegate() {
347
      return delegate;
1✔
348
    }
349

350
    /**
351
     * Wraps the actual listener so that state changes from the actual one can be intercepted.
352
     */
353
    class OutlierDetectionSubchannelStateListener implements SubchannelStateListener {
354

355
      private final SubchannelStateListener delegate;
356

357
      OutlierDetectionSubchannelStateListener(SubchannelStateListener delegate) {
1✔
358
        this.delegate = delegate;
1✔
359
      }
1✔
360

361
      @Override
362
      public void onSubchannelState(ConnectivityStateInfo newState) {
363
        lastSubchannelState = newState;
1✔
364
        if (!ejected) {
1✔
365
          delegate.onSubchannelState(newState);
1✔
366
        }
367
      }
1✔
368
    }
369

370
    @Override
371
    public String toString() {
372
      return "OutlierDetectionSubchannel{"
×
373
              + "addresses=" + delegate.getAllAddresses()
×
374
              + '}';
375
    }
376
  }
377

378

379
  /**
380
   * This picker delegates the actual picking logic to a wrapped delegate, but associates a {@link
381
   * ClientStreamTracer} with each pick to track the results of each subchannel stream.
382
   */
383
  class OutlierDetectionPicker extends SubchannelPicker {
384

385
    private final SubchannelPicker delegate;
386

387
    OutlierDetectionPicker(SubchannelPicker delegate) {
1✔
388
      this.delegate = delegate;
1✔
389
    }
1✔
390

391
    @Override
392
    public PickResult pickSubchannel(PickSubchannelArgs args) {
393
      PickResult pickResult = delegate.pickSubchannel(args);
1✔
394

395
      Subchannel subchannel = pickResult.getSubchannel();
1✔
396
      if (subchannel != null) {
1✔
397
        return PickResult.withSubchannel(subchannel, new ResultCountingClientStreamTracerFactory(
1✔
398
            subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY),
1✔
399
            pickResult.getStreamTracerFactory()));
1✔
400
      }
401

402
      return pickResult;
×
403
    }
404

405
    /**
406
     * Builds instances of a {@link ClientStreamTracer} that increments the call count in the
407
     * tracker for each closed stream.
408
     */
409
    class ResultCountingClientStreamTracerFactory extends ClientStreamTracer.Factory {
410

411
      private final AddressTracker tracker;
412

413
      @Nullable
414
      private final ClientStreamTracer.Factory delegateFactory;
415

416
      ResultCountingClientStreamTracerFactory(AddressTracker tracker,
417
          @Nullable ClientStreamTracer.Factory delegateFactory) {
1✔
418
        this.tracker = tracker;
1✔
419
        this.delegateFactory = delegateFactory;
1✔
420
      }
1✔
421

422
      @Override
423
      public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
424
        if (delegateFactory != null) {
1✔
425
          ClientStreamTracer delegateTracer = delegateFactory.newClientStreamTracer(info, headers);
1✔
426
          return new ForwardingClientStreamTracer() {
1✔
427
            @Override
428
            protected ClientStreamTracer delegate() {
429
              return delegateTracer;
1✔
430
            }
431

432
            @Override
433
            public void streamClosed(Status status) {
434
              tracker.incrementCallCount(status.isOk());
1✔
435
              delegate().streamClosed(status);
1✔
436
            }
1✔
437
          };
438
        } else {
439
          return new ClientStreamTracer() {
1✔
440
            @Override
441
            public void streamClosed(Status status) {
442
              tracker.incrementCallCount(status.isOk());
1✔
443
            }
1✔
444
          };
445
        }
446
      }
447
    }
448
  }
449

450
  /**
451
   * Tracks additional information about a set of equivalent addresses needed for outlier
452
   * detection.
453
   */
454
  static class AddressTracker {
455

456
    private OutlierDetectionLoadBalancerConfig config;
457
    // Marked as volatile to assure that when the inactive counter is swapped in as the new active
458
    // one, all threads see the change and don't hold on to a reference to the now inactive counter.
459
    private volatile CallCounter activeCallCounter = new CallCounter();
1✔
460
    private CallCounter inactiveCallCounter = new CallCounter();
1✔
461
    private Long ejectionTimeNanos;
462
    private int ejectionTimeMultiplier;
463
    private final Set<OutlierDetectionSubchannel> subchannels = new HashSet<>();
1✔
464

465
    AddressTracker(OutlierDetectionLoadBalancerConfig config) {
1✔
466
      this.config = config;
1✔
467
    }
1✔
468

469
    void setConfig(OutlierDetectionLoadBalancerConfig config) {
470
      this.config = config;
1✔
471
    }
1✔
472

473
    /**
474
     * Adds a subchannel to the tracker, while assuring that the subchannel ejection status is
475
     * updated to match the tracker's if needed.
476
     */
477
    boolean addSubchannel(OutlierDetectionSubchannel subchannel) {
478
      // Make sure that the subchannel is in the same ejection state as the new tracker it is
479
      // associated with.
480
      if (subchannelsEjected() && !subchannel.isEjected()) {
1✔
481
        subchannel.eject();
×
482
      } else if (!subchannelsEjected() && subchannel.isEjected()) {
1✔
483
        subchannel.uneject();
1✔
484
      }
485
      subchannel.setAddressTracker(this);
1✔
486
      return subchannels.add(subchannel);
1✔
487
    }
488

489
    boolean removeSubchannel(OutlierDetectionSubchannel subchannel) {
490
      subchannel.clearAddressTracker();
1✔
491
      return subchannels.remove(subchannel);
1✔
492
    }
493

494
    boolean containsSubchannel(OutlierDetectionSubchannel subchannel) {
495
      return subchannels.contains(subchannel);
×
496
    }
497

498
    @VisibleForTesting
499
    Set<OutlierDetectionSubchannel> getSubchannels() {
500
      return ImmutableSet.copyOf(subchannels);
1✔
501
    }
502

503
    void incrementCallCount(boolean success) {
504
      // If neither algorithm is configured, no point in incrementing counters.
505
      if (config.successRateEjection == null && config.failurePercentageEjection == null) {
1✔
506
        return;
×
507
      }
508

509
      if (success) {
1✔
510
        activeCallCounter.successCount.getAndIncrement();
1✔
511
      } else {
512
        activeCallCounter.failureCount.getAndIncrement();
1✔
513
      }
514
    }
1✔
515

516
    @VisibleForTesting
517
    long activeVolume() {
518
      return activeCallCounter.successCount.get() + activeCallCounter.failureCount.get();
1✔
519
    }
520

521
    long inactiveVolume() {
522
      return inactiveCallCounter.successCount.get() + inactiveCallCounter.failureCount.get();
1✔
523
    }
524

525
    double successRate() {
526
      return ((double) inactiveCallCounter.successCount.get()) / inactiveVolume();
1✔
527
    }
528

529
    double failureRate() {
530
      return ((double)inactiveCallCounter.failureCount.get()) / inactiveVolume();
1✔
531
    }
532

533
    void resetCallCounters() {
534
      activeCallCounter.reset();
1✔
535
      inactiveCallCounter.reset();
1✔
536
    }
1✔
537

538
    void decrementEjectionTimeMultiplier() {
539
      // The multiplier should not go negative.
540
      ejectionTimeMultiplier = ejectionTimeMultiplier == 0 ? 0 : ejectionTimeMultiplier - 1;
1✔
541
    }
1✔
542

543
    void resetEjectionTimeMultiplier() {
544
      ejectionTimeMultiplier = 0;
1✔
545
    }
1✔
546

547
    /**
548
     * Swaps the active and inactive counters.
549
     *
550
     * <p>Note that this method is not thread safe as the swap is not done atomically. This is
551
     * expected to only be called from the timer that is scheduled at a fixed delay, assuring that
552
     * only one timer is active at a time.
553
     */
554
    void swapCounters() {
555
      inactiveCallCounter.reset();
1✔
556
      CallCounter tempCounter = activeCallCounter;
1✔
557
      activeCallCounter = inactiveCallCounter;
1✔
558
      inactiveCallCounter = tempCounter;
1✔
559
    }
1✔
560

561
    void ejectSubchannels(long ejectionTimeNanos) {
562
      this.ejectionTimeNanos = ejectionTimeNanos;
1✔
563
      ejectionTimeMultiplier++;
1✔
564
      for (OutlierDetectionSubchannel subchannel : subchannels) {
1✔
565
        subchannel.eject();
1✔
566
      }
1✔
567
    }
1✔
568

569
    /**
570
     * Uneject a currently ejected address.
571
     */
572
    void unejectSubchannels() {
573
      checkState(ejectionTimeNanos != null, "not currently ejected");
1✔
574
      ejectionTimeNanos = null;
1✔
575
      for (OutlierDetectionSubchannel subchannel : subchannels) {
1✔
576
        subchannel.uneject();
1✔
577
      }
1✔
578
    }
1✔
579

580
    boolean subchannelsEjected() {
581
      return ejectionTimeNanos != null;
1✔
582
    }
583

584
    public boolean maxEjectionTimeElapsed(long currentTimeNanos) {
585
      // The instant in time beyond which the address should no longer be ejected. Also making sure
586
      // we honor any maximum ejection time setting.
587
      long maxEjectionDurationSecs
1✔
588
          = Math.max(config.baseEjectionTimeNanos, config.maxEjectionTimeNanos);
1✔
589
      long maxEjectionTimeNanos =
1✔
590
          ejectionTimeNanos + Math.min(
1✔
591
              config.baseEjectionTimeNanos * ejectionTimeMultiplier,
1✔
592
              maxEjectionDurationSecs);
593

594
      return currentTimeNanos > maxEjectionTimeNanos;
1✔
595
    }
596

597
    /** Tracks both successful and failed call counts. */
598
    private static class CallCounter {
1✔
599
      AtomicLong successCount = new AtomicLong();
1✔
600
      AtomicLong failureCount = new AtomicLong();
1✔
601

602
      void reset() {
603
        successCount.set(0);
1✔
604
        failureCount.set(0);
1✔
605
      }
1✔
606
    }
607

608
    @Override
609
    public String toString() {
610
      return "AddressTracker{"
×
611
              + "subchannels=" + subchannels
612
              + '}';
613
    }
614
  }
615

616
  /**
617
   * Maintains a mapping from addresses to their trackers.
618
   */
619
  static class AddressTrackerMap extends ForwardingMap<SocketAddress, AddressTracker> {
620
    private final Map<SocketAddress, AddressTracker> trackerMap;
621

622
    AddressTrackerMap() {
1✔
623
      trackerMap = new HashMap<>();
1✔
624
    }
1✔
625

626
    @Override
627
    protected Map<SocketAddress, AddressTracker> delegate() {
628
      return trackerMap;
1✔
629
    }
630

631
    void updateTrackerConfigs(OutlierDetectionLoadBalancerConfig config) {
632
      for (AddressTracker tracker: trackerMap.values()) {
1✔
633
        tracker.setConfig(config);
1✔
634
      }
1✔
635
    }
1✔
636

637
    /** Adds a new tracker for every given address. */
638
    void putNewTrackers(OutlierDetectionLoadBalancerConfig config,
639
        Collection<SocketAddress> addresses) {
640
      for (SocketAddress address : addresses) {
1✔
641
        if (!trackerMap.containsKey(address)) {
1✔
642
          trackerMap.put(address, new AddressTracker(config));
1✔
643
        }
644
      }
1✔
645
    }
1✔
646

647
    /** Resets the call counters for all the trackers in the map. */
648
    void resetCallCounters() {
649
      for (AddressTracker tracker : trackerMap.values()) {
1✔
650
        tracker.resetCallCounters();
1✔
651
      }
1✔
652
    }
1✔
653

654
    /**
655
     * When OD gets disabled we need to uneject any subchannels that may have been ejected and
656
     * to reset the ejection time multiplier.
657
     */
658
    void cancelTracking() {
659
      for (AddressTracker tracker : trackerMap.values()) {
1✔
660
        if (tracker.subchannelsEjected()) {
1✔
661
          tracker.unejectSubchannels();
×
662
        }
663
        tracker.resetEjectionTimeMultiplier();
1✔
664
      }
1✔
665
    }
1✔
666

667
    /** Swaps the active and inactive counters for each tracker. */
668
    void swapCounters() {
669
      for (AddressTracker tracker : trackerMap.values()) {
1✔
670
        tracker.swapCounters();
1✔
671
      }
1✔
672
    }
1✔
673

674
    /**
675
     * At the end of a timer run we need to decrement the ejection time multiplier for trackers
676
     * that don't have ejected subchannels and uneject ones that have spent the maximum ejection
677
     * time allowed.
678
     */
679
    void maybeUnejectOutliers(Long detectionTimerStartNanos) {
680
      for (AddressTracker tracker : trackerMap.values()) {
1✔
681
        if (!tracker.subchannelsEjected()) {
1✔
682
          tracker.decrementEjectionTimeMultiplier();
1✔
683
        }
684

685
        if (tracker.subchannelsEjected() && tracker.maxEjectionTimeElapsed(
1✔
686
            detectionTimerStartNanos)) {
1✔
687
          tracker.unejectSubchannels();
1✔
688
        }
689
      }
1✔
690
    }
1✔
691

692
    /**
693
     * How many percent of the addresses have been ejected.
694
     */
695
    double ejectionPercentage() {
696
      if (trackerMap.isEmpty()) {
1✔
697
        return 0;
×
698
      }
699
      int totalAddresses = 0;
1✔
700
      int ejectedAddresses = 0;
1✔
701
      for (AddressTracker tracker : trackerMap.values()) {
1✔
702
        totalAddresses++;
1✔
703
        if (tracker.subchannelsEjected()) {
1✔
704
          ejectedAddresses++;
1✔
705
        }
706
      }
1✔
707
      return ((double)ejectedAddresses / totalAddresses) * 100;
1✔
708
    }
709
  }
710

711

712
  /**
713
   * Implementations provide different ways of ejecting outlier addresses..
714
   */
715
  interface OutlierEjectionAlgorithm {
716

717
    /** Eject any outlier addresses. */
718
    void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos);
719

720
    /** Builds a list of algorithms that are enabled in the given config. */
721
    @Nullable
722
    static List<OutlierEjectionAlgorithm> forConfig(OutlierDetectionLoadBalancerConfig config,
723
                                                    ChannelLogger logger) {
724
      ImmutableList.Builder<OutlierEjectionAlgorithm> algoListBuilder = ImmutableList.builder();
1✔
725
      if (config.successRateEjection != null) {
1✔
726
        algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config, logger));
1✔
727
      }
728
      if (config.failurePercentageEjection != null) {
1✔
729
        algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config, logger));
1✔
730
      }
731
      return algoListBuilder.build();
1✔
732
    }
733
  }
734

735
  /**
736
   * This algorithm ejects addresses that don't maintain a required rate of successful calls. The
737
   * required rate is not fixed, but is based on the mean and standard deviation of the success
738
   * rates of all of the addresses.
739
   */
740
  static class SuccessRateOutlierEjectionAlgorithm implements OutlierEjectionAlgorithm {
741

742
    private final OutlierDetectionLoadBalancerConfig config;
743

744
    private final ChannelLogger logger;
745

746
    SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
747
                                        ChannelLogger logger) {
1✔
748
      checkArgument(config.successRateEjection != null, "success rate ejection config is null");
1✔
749
      this.config = config;
1✔
750
      this.logger = logger;
1✔
751
    }
1✔
752

753
    @Override
754
    public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) {
755

756
      // Only consider addresses that have the minimum request volume specified in the config.
757
      List<AddressTracker> trackersWithVolume = trackersWithVolume(trackerMap,
1✔
758
          config.successRateEjection.requestVolume);
1✔
759
      // If we don't have enough addresses with significant volume then there's nothing to do.
760
      if (trackersWithVolume.size() < config.successRateEjection.minimumHosts
1✔
761
          || trackersWithVolume.size() == 0) {
1✔
762
        return;
1✔
763
      }
764

765
      // Calculate mean and standard deviation of the fractions of successful calls.
766
      List<Double> successRates = new ArrayList<>();
1✔
767
      for (AddressTracker tracker : trackersWithVolume) {
1✔
768
        successRates.add(tracker.successRate());
1✔
769
      }
1✔
770
      double mean = mean(successRates);
1✔
771
      double stdev = standardDeviation(successRates, mean);
1✔
772

773
      double requiredSuccessRate =
1✔
774
          mean - stdev * (config.successRateEjection.stdevFactor / 1000f);
1✔
775

776
      for (AddressTracker tracker : trackersWithVolume) {
1✔
777
        // If we are above or equal to the max ejection percentage, don't eject any more. This will
778
        // allow the total ejections to go one above the max, but at the same time it assures at
779
        // least one ejection, which the spec calls for. This behavior matches what Envoy proxy
780
        // does.
781
        if (trackerMap.ejectionPercentage() >= config.maxEjectionPercent) {
1✔
782
          return;
1✔
783
        }
784

785
        // If success rate is below the threshold, eject the address.
786
        if (tracker.successRate() < requiredSuccessRate) {
1✔
787
          logger.log(ChannelLogLevel.DEBUG,
1✔
788
                  "SuccessRate algorithm detected outlier: {0}. "
789
                          + "Parameters: successRate={1}, mean={2}, stdev={3}, "
790
                          + "requiredSuccessRate={4}",
791
                  tracker, tracker.successRate(),  mean, stdev, requiredSuccessRate);
1✔
792
          // Only eject some addresses based on the enforcement percentage.
793
          if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) {
1✔
794
            tracker.ejectSubchannels(ejectionTimeNanos);
1✔
795
          }
796
        }
797
      }
1✔
798
    }
1✔
799

800
    /** Calculates the mean of the given values. */
801
    @VisibleForTesting
802
    static double mean(Collection<Double> values) {
803
      double totalValue = 0;
1✔
804
      for (double value : values) {
1✔
805
        totalValue += value;
1✔
806
      }
1✔
807

808
      return totalValue / values.size();
1✔
809
    }
810

811
    /** Calculates the standard deviation for the given values and their mean. */
812
    @VisibleForTesting
813
    static double standardDeviation(Collection<Double> values, double mean) {
814
      double squaredDifferenceSum = 0;
1✔
815
      for (double value : values) {
1✔
816
        double difference = value - mean;
1✔
817
        squaredDifferenceSum += difference * difference;
1✔
818
      }
1✔
819
      double variance = squaredDifferenceSum / values.size();
1✔
820

821
      return Math.sqrt(variance);
1✔
822
    }
823
  }
824

825
  static class FailurePercentageOutlierEjectionAlgorithm implements OutlierEjectionAlgorithm {
826

827
    private final OutlierDetectionLoadBalancerConfig config;
828

829
    private final ChannelLogger logger;
830

831
    FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
832
                                              ChannelLogger logger) {
1✔
833
      this.config = config;
1✔
834
      this.logger = logger;
1✔
835
    }
1✔
836

837
    @Override
838
    public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) {
839

840
      // Only consider addresses that have the minimum request volume specified in the config.
841
      List<AddressTracker> trackersWithVolume = trackersWithVolume(trackerMap,
1✔
842
          config.failurePercentageEjection.requestVolume);
1✔
843
      // If we don't have enough addresses with significant volume then there's nothing to do.
844
      if (trackersWithVolume.size() < config.failurePercentageEjection.minimumHosts
1✔
845
          || trackersWithVolume.size() == 0) {
1✔
846
        return;
1✔
847
      }
848

849
      // If this address does not have enough volume to be considered, skip to the next one.
850
      for (AddressTracker tracker : trackersWithVolume) {
1✔
851
        // If we are above or equal to the max ejection percentage, don't eject any more. This will
852
        // allow the total ejections to go one above the max, but at the same time it assures at
853
        // least one ejection, which the spec calls for. This behavior matches what Envoy proxy
854
        // does.
855
        if (trackerMap.ejectionPercentage() >= config.maxEjectionPercent) {
1✔
856
          return;
×
857
        }
858

859
        if (tracker.inactiveVolume() < config.failurePercentageEjection.requestVolume) {
1✔
860
          continue;
×
861
        }
862

863
        // If the failure rate is above the threshold, we should eject...
864
        double maxFailureRate = ((double)config.failurePercentageEjection.threshold) / 100;
1✔
865
        if (tracker.failureRate() > maxFailureRate) {
1✔
866
          logger.log(ChannelLogLevel.DEBUG,
1✔
867
                  "FailurePercentage algorithm detected outlier: {0}, failureRate={1}",
868
                  tracker, tracker.failureRate());
1✔
869
          // ...but only enforce this based on the enforcement percentage.
870
          if (new Random().nextInt(100) < config.failurePercentageEjection.enforcementPercentage) {
1✔
871
            tracker.ejectSubchannels(ejectionTimeNanos);
1✔
872
          }
873
        }
874
      }
1✔
875
    }
1✔
876
  }
877

878
  /** Returns only the trackers that have the minimum configured volume to be considered. */
879
  private static List<AddressTracker> trackersWithVolume(AddressTrackerMap trackerMap,
880
      int volume) {
881
    List<AddressTracker> trackersWithVolume = new ArrayList<>();
1✔
882
    for (AddressTracker tracker : trackerMap.values()) {
1✔
883
      if (tracker.inactiveVolume() >= volume) {
1✔
884
        trackersWithVolume.add(tracker);
1✔
885
      }
886
    }
1✔
887
    return trackersWithVolume;
1✔
888
  }
889

890
  /** Counts how many addresses are in a given address group. */
891
  private static boolean hasSingleAddress(List<EquivalentAddressGroup> addressGroups) {
892
    int addressCount = 0;
1✔
893
    for (EquivalentAddressGroup addressGroup : addressGroups) {
1✔
894
      addressCount += addressGroup.getAddresses().size();
1✔
895
      if (addressCount > 1) {
1✔
896
        return false;
1✔
897
      }
898
    }
1✔
899
    return true;
1✔
900
  }
901

902
  /**
903
   * The configuration for {@link OutlierDetectionLoadBalancer}.
904
   */
905
  public static final class OutlierDetectionLoadBalancerConfig {
906

907
    public final Long intervalNanos;
908
    public final Long baseEjectionTimeNanos;
909
    public final Long maxEjectionTimeNanos;
910
    public final Integer maxEjectionPercent;
911
    public final SuccessRateEjection successRateEjection;
912
    public final FailurePercentageEjection failurePercentageEjection;
913
    public final PolicySelection childPolicy;
914

915
    private OutlierDetectionLoadBalancerConfig(Long intervalNanos,
916
        Long baseEjectionTimeNanos,
917
        Long maxEjectionTimeNanos,
918
        Integer maxEjectionPercent,
919
        SuccessRateEjection successRateEjection,
920
        FailurePercentageEjection failurePercentageEjection,
921
        PolicySelection childPolicy) {
1✔
922
      this.intervalNanos = intervalNanos;
1✔
923
      this.baseEjectionTimeNanos = baseEjectionTimeNanos;
1✔
924
      this.maxEjectionTimeNanos = maxEjectionTimeNanos;
1✔
925
      this.maxEjectionPercent = maxEjectionPercent;
1✔
926
      this.successRateEjection = successRateEjection;
1✔
927
      this.failurePercentageEjection = failurePercentageEjection;
1✔
928
      this.childPolicy = childPolicy;
1✔
929
    }
1✔
930

931
    /** Builds a new {@link OutlierDetectionLoadBalancerConfig}. */
932
    public static class Builder {
1✔
933
      Long intervalNanos = 10_000_000_000L; // 10s
1✔
934
      Long baseEjectionTimeNanos = 30_000_000_000L; // 30s
1✔
935
      Long maxEjectionTimeNanos = 300_000_000_000L; // 300s
1✔
936
      Integer maxEjectionPercent = 10;
1✔
937
      SuccessRateEjection successRateEjection;
938
      FailurePercentageEjection failurePercentageEjection;
939
      PolicySelection childPolicy;
940

941
      /** The interval between outlier detection sweeps. */
942
      public Builder setIntervalNanos(Long intervalNanos) {
943
        checkArgument(intervalNanos != null);
1✔
944
        this.intervalNanos = intervalNanos;
1✔
945
        return this;
1✔
946
      }
947

948
      /** The base time an address is ejected for. */
949
      public Builder setBaseEjectionTimeNanos(Long baseEjectionTimeNanos) {
950
        checkArgument(baseEjectionTimeNanos != null);
1✔
951
        this.baseEjectionTimeNanos = baseEjectionTimeNanos;
1✔
952
        return this;
1✔
953
      }
954

955
      /** The longest time an address can be ejected. */
956
      public Builder setMaxEjectionTimeNanos(Long maxEjectionTimeNanos) {
957
        checkArgument(maxEjectionTimeNanos != null);
1✔
958
        this.maxEjectionTimeNanos = maxEjectionTimeNanos;
1✔
959
        return this;
1✔
960
      }
961

962
      /** The algorithm agnostic maximum percentage of addresses that can be ejected. */
963
      public Builder setMaxEjectionPercent(Integer maxEjectionPercent) {
964
        checkArgument(maxEjectionPercent != null);
1✔
965
        this.maxEjectionPercent = maxEjectionPercent;
1✔
966
        return this;
1✔
967
      }
968

969
      /** Set to enable success rate ejection. */
970
      public Builder setSuccessRateEjection(
971
          SuccessRateEjection successRateEjection) {
972
        this.successRateEjection = successRateEjection;
1✔
973
        return this;
1✔
974
      }
975

976
      /** Set to enable failure percentage ejection. */
977
      public Builder setFailurePercentageEjection(
978
          FailurePercentageEjection failurePercentageEjection) {
979
        this.failurePercentageEjection = failurePercentageEjection;
1✔
980
        return this;
1✔
981
      }
982

983
      /** Sets the child policy the {@link OutlierDetectionLoadBalancer} delegates to. */
984
      public Builder setChildPolicy(PolicySelection childPolicy) {
985
        checkState(childPolicy != null);
1✔
986
        this.childPolicy = childPolicy;
1✔
987
        return this;
1✔
988
      }
989

990
      /** Builds a new instance of {@link OutlierDetectionLoadBalancerConfig}. */
991
      public OutlierDetectionLoadBalancerConfig build() {
992
        checkState(childPolicy != null);
1✔
993
        return new OutlierDetectionLoadBalancerConfig(intervalNanos, baseEjectionTimeNanos,
1✔
994
            maxEjectionTimeNanos, maxEjectionPercent, successRateEjection,
995
            failurePercentageEjection, childPolicy);
996
      }
997
    }
998

999
    /** The configuration for success rate ejection. */
1000
    public static class SuccessRateEjection {
1001

1002
      public final Integer stdevFactor;
1003
      public final Integer enforcementPercentage;
1004
      public final Integer minimumHosts;
1005
      public final Integer requestVolume;
1006

1007
      SuccessRateEjection(Integer stdevFactor, Integer enforcementPercentage, Integer minimumHosts,
1008
          Integer requestVolume) {
1✔
1009
        this.stdevFactor = stdevFactor;
1✔
1010
        this.enforcementPercentage = enforcementPercentage;
1✔
1011
        this.minimumHosts = minimumHosts;
1✔
1012
        this.requestVolume = requestVolume;
1✔
1013
      }
1✔
1014

1015
      /** Builds new instances of {@link SuccessRateEjection}. */
1016
      public static final class Builder {
1✔
1017

1018
        Integer stdevFactor = 1900;
1✔
1019
        Integer enforcementPercentage = 100;
1✔
1020
        Integer minimumHosts = 5;
1✔
1021
        Integer requestVolume = 100;
1✔
1022

1023
        /** The product of this and the standard deviation of success rates determine the ejection
1024
         * threshold.
1025
         */
1026
        public Builder setStdevFactor(Integer stdevFactor) {
1027
          checkArgument(stdevFactor != null);
1✔
1028
          this.stdevFactor = stdevFactor;
1✔
1029
          return this;
1✔
1030
        }
1031

1032
        /** Only eject this percentage of outliers. */
1033
        public Builder setEnforcementPercentage(Integer enforcementPercentage) {
1034
          checkArgument(enforcementPercentage != null);
1✔
1035
          checkArgument(enforcementPercentage >= 0 && enforcementPercentage <= 100);
1✔
1036
          this.enforcementPercentage = enforcementPercentage;
1✔
1037
          return this;
1✔
1038
        }
1039

1040
        /** The minimum amount of hosts needed for success rate ejection. */
1041
        public Builder setMinimumHosts(Integer minimumHosts) {
1042
          checkArgument(minimumHosts != null);
1✔
1043
          checkArgument(minimumHosts >= 0);
1✔
1044
          this.minimumHosts = minimumHosts;
1✔
1045
          return this;
1✔
1046
        }
1047

1048
        /** The minimum address request volume to be considered for success rate ejection. */
1049
        public Builder setRequestVolume(Integer requestVolume) {
1050
          checkArgument(requestVolume != null);
1✔
1051
          checkArgument(requestVolume >= 0);
1✔
1052
          this.requestVolume = requestVolume;
1✔
1053
          return this;
1✔
1054
        }
1055

1056
        /** Builds a new instance of {@link SuccessRateEjection}. */
1057
        public SuccessRateEjection build() {
1058
          return new SuccessRateEjection(stdevFactor, enforcementPercentage, minimumHosts,
1✔
1059
              requestVolume);
1060
        }
1061
      }
1062
    }
1063

1064
    /** The configuration for failure percentage ejection. */
1065
    public static class FailurePercentageEjection {
1066
      public final Integer threshold;
1067
      public final Integer enforcementPercentage;
1068
      public final Integer minimumHosts;
1069
      public final Integer requestVolume;
1070

1071
      FailurePercentageEjection(Integer threshold, Integer enforcementPercentage,
1072
          Integer minimumHosts, Integer requestVolume) {
1✔
1073
        this.threshold = threshold;
1✔
1074
        this.enforcementPercentage = enforcementPercentage;
1✔
1075
        this.minimumHosts = minimumHosts;
1✔
1076
        this.requestVolume = requestVolume;
1✔
1077
      }
1✔
1078

1079
      /** For building new {@link FailurePercentageEjection} instances. */
1080
      public static class Builder {
1✔
1081
        Integer threshold = 85;
1✔
1082
        Integer enforcementPercentage = 100;
1✔
1083
        Integer minimumHosts = 5;
1✔
1084
        Integer requestVolume = 50;
1✔
1085

1086
        /** The failure percentage that will result in an address being considered an outlier. */
1087
        public Builder setThreshold(Integer threshold) {
1088
          checkArgument(threshold != null);
1✔
1089
          checkArgument(threshold >= 0 && threshold <= 100);
1✔
1090
          this.threshold = threshold;
1✔
1091
          return this;
1✔
1092
        }
1093

1094
        /** Only eject this percentage of outliers. */
1095
        public Builder setEnforcementPercentage(Integer enforcementPercentage) {
1096
          checkArgument(enforcementPercentage != null);
1✔
1097
          checkArgument(enforcementPercentage >= 0 && enforcementPercentage <= 100);
1✔
1098
          this.enforcementPercentage = enforcementPercentage;
1✔
1099
          return this;
1✔
1100
        }
1101

1102
        /** The minimum amount of host for failure percentage ejection to be enabled. */
1103
        public Builder setMinimumHosts(Integer minimumHosts) {
1104
          checkArgument(minimumHosts != null);
1✔
1105
          checkArgument(minimumHosts >= 0);
1✔
1106
          this.minimumHosts = minimumHosts;
1✔
1107
          return this;
1✔
1108
        }
1109

1110
        /**
1111
         * The request volume required for an address to be considered for failure percentage
1112
         * ejection.
1113
         */
1114
        public Builder setRequestVolume(Integer requestVolume) {
1115
          checkArgument(requestVolume != null);
1✔
1116
          checkArgument(requestVolume >= 0);
1✔
1117
          this.requestVolume = requestVolume;
1✔
1118
          return this;
1✔
1119
        }
1120

1121
        /** Builds a new instance of {@link FailurePercentageEjection}. */
1122
        public FailurePercentageEjection build() {
1123
          return new FailurePercentageEjection(threshold, enforcementPercentage, minimumHosts,
1✔
1124
              requestVolume);
1125
        }
1126
      }
1127
    }
1128

1129
    /** Determine if any outlier detection algorithms are enabled in the config. */
1130
    boolean outlierDetectionEnabled() {
1131
      return successRateEjection != null || failurePercentageEjection != null;
1✔
1132
    }
1133
  }
1134
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc