• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18910

20 Nov 2023 10:05PM UTC coverage: 88.226% (+0.03%) from 88.201%
#18910

push

github

ejona86
util: Remove shutdown subchannels from OD tracking (#10683)

An OutlierDetectionLoadBalancer child load balancer might decided to
shut down any subchannel it is tracking. We need to make sure that those
subchannels are removed from the outlier detection tracker map to avoid
a memory leak.

30370 of 34423 relevant lines covered (88.23%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.03
/../util/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.util;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static java.util.concurrent.TimeUnit.NANOSECONDS;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ForwardingMap;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableSet;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.ChannelLogger.ChannelLogLevel;
31
import io.grpc.ClientStreamTracer;
32
import io.grpc.ClientStreamTracer.StreamInfo;
33
import io.grpc.ConnectivityState;
34
import io.grpc.ConnectivityStateInfo;
35
import io.grpc.EquivalentAddressGroup;
36
import io.grpc.Internal;
37
import io.grpc.LoadBalancer;
38
import io.grpc.Metadata;
39
import io.grpc.Status;
40
import io.grpc.SynchronizationContext;
41
import io.grpc.SynchronizationContext.ScheduledHandle;
42
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
43
import io.grpc.internal.TimeProvider;
44
import java.net.SocketAddress;
45
import java.util.ArrayList;
46
import java.util.Collection;
47
import java.util.HashMap;
48
import java.util.HashSet;
49
import java.util.List;
50
import java.util.Map;
51
import java.util.Random;
52
import java.util.Set;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.atomic.AtomicLong;
55
import javax.annotation.Nullable;
56

57
/**
58
 * Wraps a child {@code LoadBalancer} while monitoring for outlier backends and removing them from
59
 * the use of the child LB.
60
 *
61
 * <p>This implements the outlier detection gRFC:
62
 * https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md
63
 */
64
@Internal
65
public final class OutlierDetectionLoadBalancer extends LoadBalancer {
66

67
  @VisibleForTesting
68
  final AddressTrackerMap trackerMap;
69

70
  private final SynchronizationContext syncContext;
71
  private final Helper childHelper;
72
  private final GracefulSwitchLoadBalancer switchLb;
73
  private TimeProvider timeProvider;
74
  private final ScheduledExecutorService timeService;
75
  private ScheduledHandle detectionTimerHandle;
76
  private Long detectionTimerStartNanos;
77

78
  private final ChannelLogger logger;
79

80
  private static final Attributes.Key<AddressTracker> ADDRESS_TRACKER_ATTR_KEY
1✔
81
      = Attributes.Key.create("addressTrackerKey");
1✔
82

83
  /**
84
   * Creates a new instance of {@link OutlierDetectionLoadBalancer}.
85
   */
86
  public OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider) {
1✔
87
    logger = helper.getChannelLogger();
1✔
88
    childHelper = new ChildHelper(checkNotNull(helper, "helper"));
1✔
89
    switchLb = new GracefulSwitchLoadBalancer(childHelper);
1✔
90
    trackerMap = new AddressTrackerMap();
1✔
91
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
92
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
93
    this.timeProvider = timeProvider;
1✔
94
    logger.log(ChannelLogLevel.DEBUG, "OutlierDetection lb created.");
1✔
95
  }
1✔
96

97
  @Override
98
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
99
    logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
100
    OutlierDetectionLoadBalancerConfig config
1✔
101
        = (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
102

103
    // The map should only retain entries for addresses in this latest update.
104
    ArrayList<SocketAddress> addresses = new ArrayList<>();
1✔
105
    for (EquivalentAddressGroup addressGroup : resolvedAddresses.getAddresses()) {
1✔
106
      addresses.addAll(addressGroup.getAddresses());
1✔
107
    }
1✔
108
    trackerMap.keySet().retainAll(addresses);
1✔
109

110
    trackerMap.updateTrackerConfigs(config);
1✔
111

112
    // Add any new ones.
113
    trackerMap.putNewTrackers(config, addresses);
1✔
114

115
    switchLb.switchTo(config.childPolicy.getProvider());
1✔
116

117
    // If outlier detection is actually configured, start a timer that will periodically try to
118
    // detect outliers.
119
    if (config.outlierDetectionEnabled()) {
1✔
120
      Long initialDelayNanos;
121

122
      if (detectionTimerStartNanos == null) {
1✔
123
        // On the first go we use the configured interval.
124
        initialDelayNanos = config.intervalNanos;
1✔
125
      } else {
126
        // If a timer has started earlier we cancel it and use the difference between the start
127
        // time and now as the interval.
128
        initialDelayNanos = Math.max(0L,
1✔
129
            config.intervalNanos - (timeProvider.currentTimeNanos() - detectionTimerStartNanos));
1✔
130
      }
131

132
      // If a timer has been previously created we need to cancel it and reset all the call counters
133
      // for a fresh start.
134
      if (detectionTimerHandle != null) {
1✔
135
        detectionTimerHandle.cancel();
1✔
136
        trackerMap.resetCallCounters();
1✔
137
      }
138

139
      detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config, logger),
1✔
140
          initialDelayNanos, config.intervalNanos, NANOSECONDS, timeService);
1✔
141
    } else if (detectionTimerHandle != null) {
1✔
142
      // Outlier detection is not configured, but we have a lingering timer. Let's cancel it and
143
      // uneject any addresses we may have ejected.
144
      detectionTimerHandle.cancel();
1✔
145
      detectionTimerStartNanos = null;
1✔
146
      trackerMap.cancelTracking();
1✔
147
    }
148

149
    switchLb.handleResolvedAddresses(
1✔
150
        resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
1✔
151
            .build());
1✔
152
    return Status.OK;
1✔
153
  }
154

155
  @Override
156
  public void handleNameResolutionError(Status error) {
157
    switchLb.handleNameResolutionError(error);
1✔
158
  }
1✔
159

160
  @Override
161
  public void shutdown() {
162
    switchLb.shutdown();
1✔
163
  }
1✔
164

165
  /**
166
   * This timer will be invoked periodically, according to configuration, and it will look for any
167
   * outlier subchannels.
168
   */
169
  class DetectionTimer implements Runnable {
170

171
    OutlierDetectionLoadBalancerConfig config;
172
    ChannelLogger logger;
173

174
    DetectionTimer(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger) {
1✔
175
      this.config = config;
1✔
176
      this.logger = logger;
1✔
177
    }
1✔
178

179
    @Override
180
    public void run() {
181
      detectionTimerStartNanos = timeProvider.currentTimeNanos();
1✔
182

183
      trackerMap.swapCounters();
1✔
184

185
      for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config, logger)) {
1✔
186
        algo.ejectOutliers(trackerMap, detectionTimerStartNanos);
1✔
187
      }
1✔
188

189
      trackerMap.maybeUnejectOutliers(detectionTimerStartNanos);
1✔
190
    }
1✔
191
  }
192

193
  /**
194
   * This child helper wraps the provided helper so that it can hand out wrapped {@link
195
   * OutlierDetectionSubchannel}s and manage the address info map.
196
   */
197
  class ChildHelper extends ForwardingLoadBalancerHelper {
198

199
    private Helper delegate;
200

201
    ChildHelper(Helper delegate) {
1✔
202
      this.delegate = delegate;
1✔
203
    }
1✔
204

205
    @Override
206
    protected Helper delegate() {
207
      return delegate;
1✔
208
    }
209

210
    @Override
211
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
212
      // Subchannels are wrapped so that we can monitor call results and to trigger failures when
213
      // we decide to eject the subchannel.
214
      OutlierDetectionSubchannel subchannel = new OutlierDetectionSubchannel(
1✔
215
          delegate.createSubchannel(args));
1✔
216

217
      // If the subchannel is associated with a single address that is also already in the map
218
      // the subchannel will be added to the map and be included in outlier detection.
219
      List<EquivalentAddressGroup> addressGroups = args.getAddresses();
1✔
220
      if (hasSingleAddress(addressGroups)
1✔
221
          && trackerMap.containsKey(addressGroups.get(0).getAddresses().get(0))) {
1✔
222
        AddressTracker tracker = trackerMap.get(addressGroups.get(0).getAddresses().get(0));
1✔
223
        tracker.addSubchannel(subchannel);
1✔
224

225
        // If this address has already been ejected, we need to immediately eject this Subchannel.
226
        if (tracker.ejectionTimeNanos != null) {
1✔
227
          subchannel.eject();
×
228
        }
229
      }
230

231
      return subchannel;
1✔
232
    }
233

234
    @Override
235
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
236
      delegate.updateBalancingState(newState, new OutlierDetectionPicker(newPicker));
1✔
237
    }
1✔
238
  }
239

240
  class OutlierDetectionSubchannel extends ForwardingSubchannel {
241

242
    private final Subchannel delegate;
243
    private AddressTracker addressTracker;
244
    private boolean ejected;
245
    private ConnectivityStateInfo lastSubchannelState;
246
    private SubchannelStateListener subchannelStateListener;
247
    private final ChannelLogger logger;
248

249
    OutlierDetectionSubchannel(Subchannel delegate) {
1✔
250
      this.delegate = delegate;
1✔
251
      this.logger = delegate.getChannelLogger();
1✔
252
    }
1✔
253

254
    @Override
255
    public void start(SubchannelStateListener listener) {
256
      subchannelStateListener = listener;
1✔
257
      super.start(new OutlierDetectionSubchannelStateListener(listener));
1✔
258
    }
1✔
259

260
    @Override
261
    public void shutdown() {
262
      if (addressTracker != null) {
1✔
263
        addressTracker.removeSubchannel(this);
1✔
264
      }
265
      super.shutdown();
1✔
266
    }
1✔
267

268
    @Override
269
    public Attributes getAttributes() {
270
      if (addressTracker != null) {
1✔
271
        return delegate.getAttributes().toBuilder().set(ADDRESS_TRACKER_ATTR_KEY, addressTracker)
1✔
272
            .build();
1✔
273
      } else {
274
        return delegate.getAttributes();
×
275
      }
276
    }
277

278
    @Override
279
    public void updateAddresses(List<EquivalentAddressGroup> addressGroups) {
280
      // Outlier detection only supports subchannels with a single address, but the list of
281
      // addressGroups associated with a subchannel can change at any time, so we need to react to
282
      // changes in the address list plurality.
283

284
      // No change in address plurality, we replace the single one with a new one.
285
      if (hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) {
1✔
286
        // Remove the current subchannel from the old address it is associated with in the map.
287
        if (trackerMap.containsValue(addressTracker)) {
1✔
288
          addressTracker.removeSubchannel(this);
1✔
289
        }
290

291
        // If the map has an entry for the new address, we associate this subchannel with it.
292
        SocketAddress address = addressGroups.get(0).getAddresses().get(0);
1✔
293
        if (trackerMap.containsKey(address)) {
1✔
294
          trackerMap.get(address).addSubchannel(this);
1✔
295
        }
296
      } else if (hasSingleAddress(getAllAddresses()) && !hasSingleAddress(addressGroups)) {
1✔
297
        // We go from a single address to having multiple, making this subchannel uneligible for
298
        // outlier detection. Remove it from all trackers and reset the call counters of all the
299
        // associated trackers.
300
        // Remove the current subchannel from the old address it is associated with in the map.
301
        if (trackerMap.containsKey(getAddresses().getAddresses().get(0))) {
1✔
302
          AddressTracker tracker = trackerMap.get(getAddresses().getAddresses().get(0));
1✔
303
          tracker.removeSubchannel(this);
1✔
304
          tracker.resetCallCounters();
1✔
305
        }
1✔
306
      } else if (!hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) {
1✔
307
        // We go from, previously uneligble, multiple address mode to a single address. If the map
308
        // has an entry for the new address, we associate this subchannel with it.
309
        SocketAddress address = addressGroups.get(0).getAddresses().get(0);
1✔
310
        if (trackerMap.containsKey(address)) {
1✔
311
          AddressTracker tracker = trackerMap.get(address);
1✔
312
          tracker.addSubchannel(this);
1✔
313
        }
314
      }
315

316
      // We could also have multiple addressGroups and get an update for multiple new ones. This is
317
      // a no-op as we will just continue to ignore multiple address subchannels.
318

319
      delegate.updateAddresses(addressGroups);
1✔
320
    }
1✔
321

322
    /**
323
     * If the {@link Subchannel} is considered for outlier detection the associated {@link
324
     * AddressTracker} should be set.
325
     */
326
    void setAddressTracker(AddressTracker addressTracker) {
327
      this.addressTracker = addressTracker;
1✔
328
    }
1✔
329

330
    void clearAddressTracker() {
331
      this.addressTracker = null;
1✔
332
    }
1✔
333

334
    void eject() {
335
      ejected = true;
1✔
336
      subchannelStateListener.onSubchannelState(
1✔
337
          ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
1✔
338
      logger.log(ChannelLogLevel.INFO, "Subchannel ejected: {0}", this);
1✔
339
    }
1✔
340

341
    void uneject() {
342
      ejected = false;
1✔
343
      if (lastSubchannelState != null) {
1✔
344
        subchannelStateListener.onSubchannelState(lastSubchannelState);
1✔
345
        logger.log(ChannelLogLevel.INFO, "Subchannel unejected: {0}", this);
1✔
346
      }
347
    }
1✔
348

349
    boolean isEjected() {
350
      return ejected;
1✔
351
    }
352

353
    @Override
354
    protected Subchannel delegate() {
355
      return delegate;
1✔
356
    }
357

358
    /**
359
     * Wraps the actual listener so that state changes from the actual one can be intercepted.
360
     */
361
    class OutlierDetectionSubchannelStateListener implements SubchannelStateListener {
362

363
      private final SubchannelStateListener delegate;
364

365
      OutlierDetectionSubchannelStateListener(SubchannelStateListener delegate) {
1✔
366
        this.delegate = delegate;
1✔
367
      }
1✔
368

369
      @Override
370
      public void onSubchannelState(ConnectivityStateInfo newState) {
371
        lastSubchannelState = newState;
1✔
372
        if (!ejected) {
1✔
373
          delegate.onSubchannelState(newState);
1✔
374
        }
375
      }
1✔
376
    }
377

378
    @Override
379
    public String toString() {
380
      return "OutlierDetectionSubchannel{"
×
381
              + "addresses=" + delegate.getAllAddresses()
×
382
              + '}';
383
    }
384
  }
385

386

387
  /**
388
   * This picker delegates the actual picking logic to a wrapped delegate, but associates a {@link
389
   * ClientStreamTracer} with each pick to track the results of each subchannel stream.
390
   */
391
  class OutlierDetectionPicker extends SubchannelPicker {
392

393
    private final SubchannelPicker delegate;
394

395
    OutlierDetectionPicker(SubchannelPicker delegate) {
1✔
396
      this.delegate = delegate;
1✔
397
    }
1✔
398

399
    @Override
400
    public PickResult pickSubchannel(PickSubchannelArgs args) {
401
      PickResult pickResult = delegate.pickSubchannel(args);
1✔
402

403
      Subchannel subchannel = pickResult.getSubchannel();
1✔
404
      if (subchannel != null) {
1✔
405
        return PickResult.withSubchannel(subchannel, new ResultCountingClientStreamTracerFactory(
1✔
406
            subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY),
1✔
407
            pickResult.getStreamTracerFactory()));
1✔
408
      }
409

410
      return pickResult;
×
411
    }
412

413
    /**
414
     * Builds instances of a {@link ClientStreamTracer} that increments the call count in the
415
     * tracker for each closed stream.
416
     */
417
    class ResultCountingClientStreamTracerFactory extends ClientStreamTracer.Factory {
418

419
      private final AddressTracker tracker;
420

421
      @Nullable
422
      private final ClientStreamTracer.Factory delegateFactory;
423

424
      ResultCountingClientStreamTracerFactory(AddressTracker tracker,
425
          @Nullable ClientStreamTracer.Factory delegateFactory) {
1✔
426
        this.tracker = tracker;
1✔
427
        this.delegateFactory = delegateFactory;
1✔
428
      }
1✔
429

430
      @Override
431
      public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
432
        if (delegateFactory != null) {
1✔
433
          ClientStreamTracer delegateTracer = delegateFactory.newClientStreamTracer(info, headers);
1✔
434
          return new ForwardingClientStreamTracer() {
1✔
435
            @Override
436
            protected ClientStreamTracer delegate() {
437
              return delegateTracer;
1✔
438
            }
439

440
            @Override
441
            public void streamClosed(Status status) {
442
              tracker.incrementCallCount(status.isOk());
1✔
443
              delegate().streamClosed(status);
1✔
444
            }
1✔
445
          };
446
        } else {
447
          return new ClientStreamTracer() {
1✔
448
            @Override
449
            public void streamClosed(Status status) {
450
              tracker.incrementCallCount(status.isOk());
1✔
451
            }
1✔
452
          };
453
        }
454
      }
455
    }
456
  }
457

458
  /**
459
   * Tracks additional information about a set of equivalent addresses needed for outlier
460
   * detection.
461
   */
462
  static class AddressTracker {
463

464
    private OutlierDetectionLoadBalancerConfig config;
465
    // Marked as volatile to assure that when the inactive counter is swapped in as the new active
466
    // one, all threads see the change and don't hold on to a reference to the now inactive counter.
467
    private volatile CallCounter activeCallCounter = new CallCounter();
1✔
468
    private CallCounter inactiveCallCounter = new CallCounter();
1✔
469
    private Long ejectionTimeNanos;
470
    private int ejectionTimeMultiplier;
471
    private final Set<OutlierDetectionSubchannel> subchannels = new HashSet<>();
1✔
472

473
    AddressTracker(OutlierDetectionLoadBalancerConfig config) {
1✔
474
      this.config = config;
1✔
475
    }
1✔
476

477
    void setConfig(OutlierDetectionLoadBalancerConfig config) {
478
      this.config = config;
1✔
479
    }
1✔
480

481
    /**
482
     * Adds a subchannel to the tracker, while assuring that the subchannel ejection status is
483
     * updated to match the tracker's if needed.
484
     */
485
    boolean addSubchannel(OutlierDetectionSubchannel subchannel) {
486
      // Make sure that the subchannel is in the same ejection state as the new tracker it is
487
      // associated with.
488
      if (subchannelsEjected() && !subchannel.isEjected()) {
1✔
489
        subchannel.eject();
×
490
      } else if (!subchannelsEjected() && subchannel.isEjected()) {
1✔
491
        subchannel.uneject();
1✔
492
      }
493
      subchannel.setAddressTracker(this);
1✔
494
      return subchannels.add(subchannel);
1✔
495
    }
496

497
    boolean removeSubchannel(OutlierDetectionSubchannel subchannel) {
498
      subchannel.clearAddressTracker();
1✔
499
      return subchannels.remove(subchannel);
1✔
500
    }
501

502
    boolean containsSubchannel(OutlierDetectionSubchannel subchannel) {
503
      return subchannels.contains(subchannel);
×
504
    }
505

506
    @VisibleForTesting
507
    Set<OutlierDetectionSubchannel> getSubchannels() {
508
      return ImmutableSet.copyOf(subchannels);
1✔
509
    }
510

511
    void incrementCallCount(boolean success) {
512
      // If neither algorithm is configured, no point in incrementing counters.
513
      if (config.successRateEjection == null && config.failurePercentageEjection == null) {
1✔
514
        return;
×
515
      }
516

517
      if (success) {
1✔
518
        activeCallCounter.successCount.getAndIncrement();
1✔
519
      } else {
520
        activeCallCounter.failureCount.getAndIncrement();
1✔
521
      }
522
    }
1✔
523

524
    @VisibleForTesting
525
    long activeVolume() {
526
      return activeCallCounter.successCount.get() + activeCallCounter.failureCount.get();
1✔
527
    }
528

529
    long inactiveVolume() {
530
      return inactiveCallCounter.successCount.get() + inactiveCallCounter.failureCount.get();
1✔
531
    }
532

533
    double successRate() {
534
      return ((double) inactiveCallCounter.successCount.get()) / inactiveVolume();
1✔
535
    }
536

537
    double failureRate() {
538
      return ((double)inactiveCallCounter.failureCount.get()) / inactiveVolume();
1✔
539
    }
540

541
    void resetCallCounters() {
542
      activeCallCounter.reset();
1✔
543
      inactiveCallCounter.reset();
1✔
544
    }
1✔
545

546
    void decrementEjectionTimeMultiplier() {
547
      // The multiplier should not go negative.
548
      ejectionTimeMultiplier = ejectionTimeMultiplier == 0 ? 0 : ejectionTimeMultiplier - 1;
1✔
549
    }
1✔
550

551
    void resetEjectionTimeMultiplier() {
552
      ejectionTimeMultiplier = 0;
1✔
553
    }
1✔
554

555
    /**
556
     * Swaps the active and inactive counters.
557
     *
558
     * <p>Note that this method is not thread safe as the swap is not done atomically. This is
559
     * expected to only be called from the timer that is scheduled at a fixed delay, assuring that
560
     * only one timer is active at a time.
561
     */
562
    void swapCounters() {
563
      inactiveCallCounter.reset();
1✔
564
      CallCounter tempCounter = activeCallCounter;
1✔
565
      activeCallCounter = inactiveCallCounter;
1✔
566
      inactiveCallCounter = tempCounter;
1✔
567
    }
1✔
568

569
    void ejectSubchannels(long ejectionTimeNanos) {
570
      this.ejectionTimeNanos = ejectionTimeNanos;
1✔
571
      ejectionTimeMultiplier++;
1✔
572
      for (OutlierDetectionSubchannel subchannel : subchannels) {
1✔
573
        subchannel.eject();
1✔
574
      }
1✔
575
    }
1✔
576

577
    /**
578
     * Uneject a currently ejected address.
579
     */
580
    void unejectSubchannels() {
581
      checkState(ejectionTimeNanos != null, "not currently ejected");
1✔
582
      ejectionTimeNanos = null;
1✔
583
      for (OutlierDetectionSubchannel subchannel : subchannels) {
1✔
584
        subchannel.uneject();
1✔
585
      }
1✔
586
    }
1✔
587

588
    boolean subchannelsEjected() {
589
      return ejectionTimeNanos != null;
1✔
590
    }
591

592
    public boolean maxEjectionTimeElapsed(long currentTimeNanos) {
593
      // The instant in time beyond which the address should no longer be ejected. Also making sure
594
      // we honor any maximum ejection time setting.
595
      long maxEjectionDurationSecs
1✔
596
          = Math.max(config.baseEjectionTimeNanos, config.maxEjectionTimeNanos);
1✔
597
      long maxEjectionTimeNanos =
1✔
598
          ejectionTimeNanos + Math.min(
1✔
599
              config.baseEjectionTimeNanos * ejectionTimeMultiplier,
1✔
600
              maxEjectionDurationSecs);
601

602
      return currentTimeNanos > maxEjectionTimeNanos;
1✔
603
    }
604

605
    /** Tracks both successful and failed call counts. */
606
    private static class CallCounter {
1✔
607
      AtomicLong successCount = new AtomicLong();
1✔
608
      AtomicLong failureCount = new AtomicLong();
1✔
609

610
      void reset() {
611
        successCount.set(0);
1✔
612
        failureCount.set(0);
1✔
613
      }
1✔
614
    }
615

616
    @Override
617
    public String toString() {
618
      return "AddressTracker{"
×
619
              + "subchannels=" + subchannels
620
              + '}';
621
    }
622
  }
623

624
  /**
625
   * Maintains a mapping from addresses to their trackers.
626
   */
627
  static class AddressTrackerMap extends ForwardingMap<SocketAddress, AddressTracker> {
628
    private final Map<SocketAddress, AddressTracker> trackerMap;
629

630
    AddressTrackerMap() {
1✔
631
      trackerMap = new HashMap<>();
1✔
632
    }
1✔
633

634
    @Override
635
    protected Map<SocketAddress, AddressTracker> delegate() {
636
      return trackerMap;
1✔
637
    }
638

639
    void updateTrackerConfigs(OutlierDetectionLoadBalancerConfig config) {
640
      for (AddressTracker tracker: trackerMap.values()) {
1✔
641
        tracker.setConfig(config);
1✔
642
      }
1✔
643
    }
1✔
644

645
    /** Adds a new tracker for every given address. */
646
    void putNewTrackers(OutlierDetectionLoadBalancerConfig config,
647
        Collection<SocketAddress> addresses) {
648
      for (SocketAddress address : addresses) {
1✔
649
        if (!trackerMap.containsKey(address)) {
1✔
650
          trackerMap.put(address, new AddressTracker(config));
1✔
651
        }
652
      }
1✔
653
    }
1✔
654

655
    /** Resets the call counters for all the trackers in the map. */
656
    void resetCallCounters() {
657
      for (AddressTracker tracker : trackerMap.values()) {
1✔
658
        tracker.resetCallCounters();
1✔
659
      }
1✔
660
    }
1✔
661

662
    /**
663
     * When OD gets disabled we need to uneject any subchannels that may have been ejected and
664
     * to reset the ejection time multiplier.
665
     */
666
    void cancelTracking() {
667
      for (AddressTracker tracker : trackerMap.values()) {
1✔
668
        if (tracker.subchannelsEjected()) {
1✔
669
          tracker.unejectSubchannels();
×
670
        }
671
        tracker.resetEjectionTimeMultiplier();
1✔
672
      }
1✔
673
    }
1✔
674

675
    /** Swaps the active and inactive counters for each tracker. */
676
    void swapCounters() {
677
      for (AddressTracker tracker : trackerMap.values()) {
1✔
678
        tracker.swapCounters();
1✔
679
      }
1✔
680
    }
1✔
681

682
    /**
683
     * At the end of a timer run we need to decrement the ejection time multiplier for trackers
684
     * that don't have ejected subchannels and uneject ones that have spent the maximum ejection
685
     * time allowed.
686
     */
687
    void maybeUnejectOutliers(Long detectionTimerStartNanos) {
688
      for (AddressTracker tracker : trackerMap.values()) {
1✔
689
        if (!tracker.subchannelsEjected()) {
1✔
690
          tracker.decrementEjectionTimeMultiplier();
1✔
691
        }
692

693
        if (tracker.subchannelsEjected() && tracker.maxEjectionTimeElapsed(
1✔
694
            detectionTimerStartNanos)) {
1✔
695
          tracker.unejectSubchannels();
1✔
696
        }
697
      }
1✔
698
    }
1✔
699

700
    /**
701
     * How many percent of the addresses have been ejected.
702
     */
703
    double ejectionPercentage() {
704
      if (trackerMap.isEmpty()) {
1✔
705
        return 0;
×
706
      }
707
      int totalAddresses = 0;
1✔
708
      int ejectedAddresses = 0;
1✔
709
      for (AddressTracker tracker : trackerMap.values()) {
1✔
710
        totalAddresses++;
1✔
711
        if (tracker.subchannelsEjected()) {
1✔
712
          ejectedAddresses++;
1✔
713
        }
714
      }
1✔
715
      return ((double)ejectedAddresses / totalAddresses) * 100;
1✔
716
    }
717
  }
718

719

720
  /**
721
   * Implementations provide different ways of ejecting outlier addresses..
722
   */
723
  interface OutlierEjectionAlgorithm {
724

725
    /** Eject any outlier addresses. */
726
    void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos);
727

728
    /** Builds a list of algorithms that are enabled in the given config. */
729
    @Nullable
730
    static List<OutlierEjectionAlgorithm> forConfig(OutlierDetectionLoadBalancerConfig config,
731
                                                    ChannelLogger logger) {
732
      ImmutableList.Builder<OutlierEjectionAlgorithm> algoListBuilder = ImmutableList.builder();
1✔
733
      if (config.successRateEjection != null) {
1✔
734
        algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config, logger));
1✔
735
      }
736
      if (config.failurePercentageEjection != null) {
1✔
737
        algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config, logger));
1✔
738
      }
739
      return algoListBuilder.build();
1✔
740
    }
741
  }
742

743
  /**
744
   * This algorithm ejects addresses that don't maintain a required rate of successful calls. The
745
   * required rate is not fixed, but is based on the mean and standard deviation of the success
746
   * rates of all of the addresses.
747
   */
748
  static class SuccessRateOutlierEjectionAlgorithm implements OutlierEjectionAlgorithm {
749

750
    private final OutlierDetectionLoadBalancerConfig config;
751

752
    private final ChannelLogger logger;
753

754
    SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
755
                                        ChannelLogger logger) {
1✔
756
      checkArgument(config.successRateEjection != null, "success rate ejection config is null");
1✔
757
      this.config = config;
1✔
758
      this.logger = logger;
1✔
759
    }
1✔
760

761
    @Override
762
    public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) {
763

764
      // Only consider addresses that have the minimum request volume specified in the config.
765
      List<AddressTracker> trackersWithVolume = trackersWithVolume(trackerMap,
1✔
766
          config.successRateEjection.requestVolume);
1✔
767
      // If we don't have enough addresses with significant volume then there's nothing to do.
768
      if (trackersWithVolume.size() < config.successRateEjection.minimumHosts
1✔
769
          || trackersWithVolume.size() == 0) {
1✔
770
        return;
1✔
771
      }
772

773
      // Calculate mean and standard deviation of the fractions of successful calls.
774
      List<Double> successRates = new ArrayList<>();
1✔
775
      for (AddressTracker tracker : trackersWithVolume) {
1✔
776
        successRates.add(tracker.successRate());
1✔
777
      }
1✔
778
      double mean = mean(successRates);
1✔
779
      double stdev = standardDeviation(successRates, mean);
1✔
780

781
      double requiredSuccessRate =
1✔
782
          mean - stdev * (config.successRateEjection.stdevFactor / 1000f);
1✔
783

784
      for (AddressTracker tracker : trackersWithVolume) {
1✔
785
        // If we are above or equal to the max ejection percentage, don't eject any more. This will
786
        // allow the total ejections to go one above the max, but at the same time it assures at
787
        // least one ejection, which the spec calls for. This behavior matches what Envoy proxy
788
        // does.
789
        if (trackerMap.ejectionPercentage() >= config.maxEjectionPercent) {
1✔
790
          return;
1✔
791
        }
792

793
        // If success rate is below the threshold, eject the address.
794
        if (tracker.successRate() < requiredSuccessRate) {
1✔
795
          logger.log(ChannelLogLevel.DEBUG,
1✔
796
                  "SuccessRate algorithm detected outlier: {0}. "
797
                          + "Parameters: successRate={1}, mean={2}, stdev={3}, "
798
                          + "requiredSuccessRate={4}",
799
                  tracker, tracker.successRate(),  mean, stdev, requiredSuccessRate);
1✔
800
          // Only eject some addresses based on the enforcement percentage.
801
          if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) {
1✔
802
            tracker.ejectSubchannels(ejectionTimeNanos);
1✔
803
          }
804
        }
805
      }
1✔
806
    }
1✔
807

808
    /** Calculates the mean of the given values. */
809
    @VisibleForTesting
810
    static double mean(Collection<Double> values) {
811
      double totalValue = 0;
1✔
812
      for (double value : values) {
1✔
813
        totalValue += value;
1✔
814
      }
1✔
815

816
      return totalValue / values.size();
1✔
817
    }
818

819
    /** Calculates the standard deviation for the given values and their mean. */
820
    @VisibleForTesting
821
    static double standardDeviation(Collection<Double> values, double mean) {
822
      double squaredDifferenceSum = 0;
1✔
823
      for (double value : values) {
1✔
824
        double difference = value - mean;
1✔
825
        squaredDifferenceSum += difference * difference;
1✔
826
      }
1✔
827
      double variance = squaredDifferenceSum / values.size();
1✔
828

829
      return Math.sqrt(variance);
1✔
830
    }
831
  }
832

833
  static class FailurePercentageOutlierEjectionAlgorithm implements OutlierEjectionAlgorithm {
834

835
    private final OutlierDetectionLoadBalancerConfig config;
836

837
    private final ChannelLogger logger;
838

839
    FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
840
                                              ChannelLogger logger) {
1✔
841
      this.config = config;
1✔
842
      this.logger = logger;
1✔
843
    }
1✔
844

845
    @Override
846
    public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) {
847

848
      // Only consider addresses that have the minimum request volume specified in the config.
849
      List<AddressTracker> trackersWithVolume = trackersWithVolume(trackerMap,
1✔
850
          config.failurePercentageEjection.requestVolume);
1✔
851
      // If we don't have enough addresses with significant volume then there's nothing to do.
852
      if (trackersWithVolume.size() < config.failurePercentageEjection.minimumHosts
1✔
853
          || trackersWithVolume.size() == 0) {
1✔
854
        return;
1✔
855
      }
856

857
      // If this address does not have enough volume to be considered, skip to the next one.
858
      for (AddressTracker tracker : trackersWithVolume) {
1✔
859
        // If we are above or equal to the max ejection percentage, don't eject any more. This will
860
        // allow the total ejections to go one above the max, but at the same time it assures at
861
        // least one ejection, which the spec calls for. This behavior matches what Envoy proxy
862
        // does.
863
        if (trackerMap.ejectionPercentage() >= config.maxEjectionPercent) {
1✔
864
          return;
×
865
        }
866

867
        if (tracker.inactiveVolume() < config.failurePercentageEjection.requestVolume) {
1✔
868
          continue;
×
869
        }
870

871
        // If the failure rate is above the threshold, we should eject...
872
        double maxFailureRate = ((double)config.failurePercentageEjection.threshold) / 100;
1✔
873
        if (tracker.failureRate() > maxFailureRate) {
1✔
874
          logger.log(ChannelLogLevel.DEBUG,
1✔
875
                  "FailurePercentage algorithm detected outlier: {0}, failureRate={1}",
876
                  tracker, tracker.failureRate());
1✔
877
          // ...but only enforce this based on the enforcement percentage.
878
          if (new Random().nextInt(100) < config.failurePercentageEjection.enforcementPercentage) {
1✔
879
            tracker.ejectSubchannels(ejectionTimeNanos);
1✔
880
          }
881
        }
882
      }
1✔
883
    }
1✔
884
  }
885

886
  /** Returns only the trackers that have the minimum configured volume to be considered. */
887
  private static List<AddressTracker> trackersWithVolume(AddressTrackerMap trackerMap,
888
      int volume) {
889
    List<AddressTracker> trackersWithVolume = new ArrayList<>();
1✔
890
    for (AddressTracker tracker : trackerMap.values()) {
1✔
891
      if (tracker.inactiveVolume() >= volume) {
1✔
892
        trackersWithVolume.add(tracker);
1✔
893
      }
894
    }
1✔
895
    return trackersWithVolume;
1✔
896
  }
897

898
  /** Counts how many addresses are in a given address group. */
899
  private static boolean hasSingleAddress(List<EquivalentAddressGroup> addressGroups) {
900
    int addressCount = 0;
1✔
901
    for (EquivalentAddressGroup addressGroup : addressGroups) {
1✔
902
      addressCount += addressGroup.getAddresses().size();
1✔
903
      if (addressCount > 1) {
1✔
904
        return false;
1✔
905
      }
906
    }
1✔
907
    return true;
1✔
908
  }
909

910
  /**
911
   * The configuration for {@link OutlierDetectionLoadBalancer}.
912
   */
913
  public static final class OutlierDetectionLoadBalancerConfig {
914

915
    public final Long intervalNanos;
916
    public final Long baseEjectionTimeNanos;
917
    public final Long maxEjectionTimeNanos;
918
    public final Integer maxEjectionPercent;
919
    public final SuccessRateEjection successRateEjection;
920
    public final FailurePercentageEjection failurePercentageEjection;
921
    public final PolicySelection childPolicy;
922

923
    private OutlierDetectionLoadBalancerConfig(Long intervalNanos,
924
        Long baseEjectionTimeNanos,
925
        Long maxEjectionTimeNanos,
926
        Integer maxEjectionPercent,
927
        SuccessRateEjection successRateEjection,
928
        FailurePercentageEjection failurePercentageEjection,
929
        PolicySelection childPolicy) {
1✔
930
      this.intervalNanos = intervalNanos;
1✔
931
      this.baseEjectionTimeNanos = baseEjectionTimeNanos;
1✔
932
      this.maxEjectionTimeNanos = maxEjectionTimeNanos;
1✔
933
      this.maxEjectionPercent = maxEjectionPercent;
1✔
934
      this.successRateEjection = successRateEjection;
1✔
935
      this.failurePercentageEjection = failurePercentageEjection;
1✔
936
      this.childPolicy = childPolicy;
1✔
937
    }
1✔
938

939
    /** Builds a new {@link OutlierDetectionLoadBalancerConfig}. */
940
    public static class Builder {
1✔
941
      Long intervalNanos = 10_000_000_000L; // 10s
1✔
942
      Long baseEjectionTimeNanos = 30_000_000_000L; // 30s
1✔
943
      Long maxEjectionTimeNanos = 300_000_000_000L; // 300s
1✔
944
      Integer maxEjectionPercent = 10;
1✔
945
      SuccessRateEjection successRateEjection;
946
      FailurePercentageEjection failurePercentageEjection;
947
      PolicySelection childPolicy;
948

949
      /** The interval between outlier detection sweeps. */
950
      public Builder setIntervalNanos(Long intervalNanos) {
951
        checkArgument(intervalNanos != null);
1✔
952
        this.intervalNanos = intervalNanos;
1✔
953
        return this;
1✔
954
      }
955

956
      /** The base time an address is ejected for. */
957
      public Builder setBaseEjectionTimeNanos(Long baseEjectionTimeNanos) {
958
        checkArgument(baseEjectionTimeNanos != null);
1✔
959
        this.baseEjectionTimeNanos = baseEjectionTimeNanos;
1✔
960
        return this;
1✔
961
      }
962

963
      /** The longest time an address can be ejected. */
964
      public Builder setMaxEjectionTimeNanos(Long maxEjectionTimeNanos) {
965
        checkArgument(maxEjectionTimeNanos != null);
1✔
966
        this.maxEjectionTimeNanos = maxEjectionTimeNanos;
1✔
967
        return this;
1✔
968
      }
969

970
      /** The algorithm agnostic maximum percentage of addresses that can be ejected. */
971
      public Builder setMaxEjectionPercent(Integer maxEjectionPercent) {
972
        checkArgument(maxEjectionPercent != null);
1✔
973
        this.maxEjectionPercent = maxEjectionPercent;
1✔
974
        return this;
1✔
975
      }
976

977
      /** Set to enable success rate ejection. */
978
      public Builder setSuccessRateEjection(
979
          SuccessRateEjection successRateEjection) {
980
        this.successRateEjection = successRateEjection;
1✔
981
        return this;
1✔
982
      }
983

984
      /** Set to enable failure percentage ejection. */
985
      public Builder setFailurePercentageEjection(
986
          FailurePercentageEjection failurePercentageEjection) {
987
        this.failurePercentageEjection = failurePercentageEjection;
1✔
988
        return this;
1✔
989
      }
990

991
      /** Sets the child policy the {@link OutlierDetectionLoadBalancer} delegates to. */
992
      public Builder setChildPolicy(PolicySelection childPolicy) {
993
        checkState(childPolicy != null);
1✔
994
        this.childPolicy = childPolicy;
1✔
995
        return this;
1✔
996
      }
997

998
      /** Builds a new instance of {@link OutlierDetectionLoadBalancerConfig}. */
999
      public OutlierDetectionLoadBalancerConfig build() {
1000
        checkState(childPolicy != null);
1✔
1001
        return new OutlierDetectionLoadBalancerConfig(intervalNanos, baseEjectionTimeNanos,
1✔
1002
            maxEjectionTimeNanos, maxEjectionPercent, successRateEjection,
1003
            failurePercentageEjection, childPolicy);
1004
      }
1005
    }
1006

1007
    /** The configuration for success rate ejection. */
1008
    public static class SuccessRateEjection {
1009

1010
      public final Integer stdevFactor;
1011
      public final Integer enforcementPercentage;
1012
      public final Integer minimumHosts;
1013
      public final Integer requestVolume;
1014

1015
      SuccessRateEjection(Integer stdevFactor, Integer enforcementPercentage, Integer minimumHosts,
1016
          Integer requestVolume) {
1✔
1017
        this.stdevFactor = stdevFactor;
1✔
1018
        this.enforcementPercentage = enforcementPercentage;
1✔
1019
        this.minimumHosts = minimumHosts;
1✔
1020
        this.requestVolume = requestVolume;
1✔
1021
      }
1✔
1022

1023
      /** Builds new instances of {@link SuccessRateEjection}. */
1024
      public static final class Builder {
1✔
1025

1026
        Integer stdevFactor = 1900;
1✔
1027
        Integer enforcementPercentage = 100;
1✔
1028
        Integer minimumHosts = 5;
1✔
1029
        Integer requestVolume = 100;
1✔
1030

1031
        /** The product of this and the standard deviation of success rates determine the ejection
1032
         * threshold.
1033
         */
1034
        public Builder setStdevFactor(Integer stdevFactor) {
1035
          checkArgument(stdevFactor != null);
1✔
1036
          this.stdevFactor = stdevFactor;
1✔
1037
          return this;
1✔
1038
        }
1039

1040
        /** Only eject this percentage of outliers. */
1041
        public Builder setEnforcementPercentage(Integer enforcementPercentage) {
1042
          checkArgument(enforcementPercentage != null);
1✔
1043
          checkArgument(enforcementPercentage >= 0 && enforcementPercentage <= 100);
1✔
1044
          this.enforcementPercentage = enforcementPercentage;
1✔
1045
          return this;
1✔
1046
        }
1047

1048
        /** The minimum amount of hosts needed for success rate ejection. */
1049
        public Builder setMinimumHosts(Integer minimumHosts) {
1050
          checkArgument(minimumHosts != null);
1✔
1051
          checkArgument(minimumHosts >= 0);
1✔
1052
          this.minimumHosts = minimumHosts;
1✔
1053
          return this;
1✔
1054
        }
1055

1056
        /** The minimum address request volume to be considered for success rate ejection. */
1057
        public Builder setRequestVolume(Integer requestVolume) {
1058
          checkArgument(requestVolume != null);
1✔
1059
          checkArgument(requestVolume >= 0);
1✔
1060
          this.requestVolume = requestVolume;
1✔
1061
          return this;
1✔
1062
        }
1063

1064
        /** Builds a new instance of {@link SuccessRateEjection}. */
1065
        public SuccessRateEjection build() {
1066
          return new SuccessRateEjection(stdevFactor, enforcementPercentage, minimumHosts,
1✔
1067
              requestVolume);
1068
        }
1069
      }
1070
    }
1071

1072
    /** The configuration for failure percentage ejection. */
1073
    public static class FailurePercentageEjection {
1074
      public final Integer threshold;
1075
      public final Integer enforcementPercentage;
1076
      public final Integer minimumHosts;
1077
      public final Integer requestVolume;
1078

1079
      FailurePercentageEjection(Integer threshold, Integer enforcementPercentage,
1080
          Integer minimumHosts, Integer requestVolume) {
1✔
1081
        this.threshold = threshold;
1✔
1082
        this.enforcementPercentage = enforcementPercentage;
1✔
1083
        this.minimumHosts = minimumHosts;
1✔
1084
        this.requestVolume = requestVolume;
1✔
1085
      }
1✔
1086

1087
      /** For building new {@link FailurePercentageEjection} instances. */
1088
      public static class Builder {
1✔
1089
        Integer threshold = 85;
1✔
1090
        Integer enforcementPercentage = 100;
1✔
1091
        Integer minimumHosts = 5;
1✔
1092
        Integer requestVolume = 50;
1✔
1093

1094
        /** The failure percentage that will result in an address being considered an outlier. */
1095
        public Builder setThreshold(Integer threshold) {
1096
          checkArgument(threshold != null);
1✔
1097
          checkArgument(threshold >= 0 && threshold <= 100);
1✔
1098
          this.threshold = threshold;
1✔
1099
          return this;
1✔
1100
        }
1101

1102
        /** Only eject this percentage of outliers. */
1103
        public Builder setEnforcementPercentage(Integer enforcementPercentage) {
1104
          checkArgument(enforcementPercentage != null);
1✔
1105
          checkArgument(enforcementPercentage >= 0 && enforcementPercentage <= 100);
1✔
1106
          this.enforcementPercentage = enforcementPercentage;
1✔
1107
          return this;
1✔
1108
        }
1109

1110
        /** The minimum amount of host for failure percentage ejection to be enabled. */
1111
        public Builder setMinimumHosts(Integer minimumHosts) {
1112
          checkArgument(minimumHosts != null);
1✔
1113
          checkArgument(minimumHosts >= 0);
1✔
1114
          this.minimumHosts = minimumHosts;
1✔
1115
          return this;
1✔
1116
        }
1117

1118
        /**
1119
         * The request volume required for an address to be considered for failure percentage
1120
         * ejection.
1121
         */
1122
        public Builder setRequestVolume(Integer requestVolume) {
1123
          checkArgument(requestVolume != null);
1✔
1124
          checkArgument(requestVolume >= 0);
1✔
1125
          this.requestVolume = requestVolume;
1✔
1126
          return this;
1✔
1127
        }
1128

1129
        /** Builds a new instance of {@link FailurePercentageEjection}. */
1130
        public FailurePercentageEjection build() {
1131
          return new FailurePercentageEjection(threshold, enforcementPercentage, minimumHosts,
1✔
1132
              requestVolume);
1133
        }
1134
      }
1135
    }
1136

1137
    /** Determine if any outlier detection algorithms are enabled in the config. */
1138
    boolean outlierDetectionEnabled() {
1139
      return successRateEjection != null || failurePercentageEjection != null;
1✔
1140
    }
1141
  }
1142
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc