• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18890

09 Nov 2023 09:46PM UTC coverage: 88.206% (-0.06%) from 88.264%
#18890

push

github

web-flow
xds:Make Ring Hash LB a petiole policy (#10610)

* Update picker logic per A61 that it no longer pays attention to the first 2 elements, but rather takes the first ring element not in TF and uses that.
---------
Pulled in by rebase:
Eric Anderson  (android: Remove unneeded proguard rule 44723b6)
Terry Wilson (stub: Deprecate StreamObservers b5434e8)

30334 of 34390 relevant lines covered (88.21%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.35
/../xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkElementIndex;
21
import static com.google.common.base.Preconditions.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.collect.ImmutableList;
27
import io.grpc.ConnectivityState;
28
import io.grpc.ConnectivityStateInfo;
29
import io.grpc.Deadline.Ticker;
30
import io.grpc.EquivalentAddressGroup;
31
import io.grpc.ExperimentalApi;
32
import io.grpc.LoadBalancer;
33
import io.grpc.LoadBalancerProvider;
34
import io.grpc.NameResolver;
35
import io.grpc.Status;
36
import io.grpc.SynchronizationContext;
37
import io.grpc.SynchronizationContext.ScheduledHandle;
38
import io.grpc.services.MetricReport;
39
import io.grpc.util.ForwardingLoadBalancerHelper;
40
import io.grpc.util.ForwardingSubchannel;
41
import io.grpc.util.RoundRobinLoadBalancer;
42
import io.grpc.xds.orca.OrcaOobUtil;
43
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
44
import io.grpc.xds.orca.OrcaPerRequestUtil;
45
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
46
import java.util.Collection;
47
import java.util.HashMap;
48
import java.util.HashSet;
49
import java.util.List;
50
import java.util.Map;
51
import java.util.Random;
52
import java.util.Set;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.TimeUnit;
55
import java.util.concurrent.atomic.AtomicInteger;
56
import java.util.logging.Level;
57
import java.util.logging.Logger;
58

59
/**
60
 * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over
61
 * the {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
62
 * determined by backend metrics using ORCA.
63
 */
64
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9885")
65
final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
66
  private static final Logger log = Logger.getLogger(
1✔
67
      WeightedRoundRobinLoadBalancer.class.getName());
1✔
68
  private WeightedRoundRobinLoadBalancerConfig config;
69
  private final SynchronizationContext syncContext;
70
  private final ScheduledExecutorService timeService;
71
  private ScheduledHandle weightUpdateTimer;
72
  private final Runnable updateWeightTask;
73
  private final AtomicInteger sequence;
74
  private final long infTime;
75
  private final Ticker ticker;
76

77
  public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
78
    this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random());
1✔
79
  }
1✔
80

81
  public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker, Random random) {
82
    super(helper);
1✔
83
    helper.setLoadBalancer(this);
1✔
84
    this.ticker = checkNotNull(ticker, "ticker");
1✔
85
    this.infTime = ticker.nanoTime() + Long.MAX_VALUE;
1✔
86
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
87
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
88
    this.updateWeightTask = new UpdateWeightTask();
1✔
89
    this.sequence = new AtomicInteger(random.nextInt());
1✔
90
    log.log(Level.FINE, "weighted_round_robin LB created");
1✔
91
  }
1✔
92

93
  @VisibleForTesting
94
  WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
95
    this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, random);
1✔
96
  }
1✔
97

98
  @Override
99
  protected ChildLbState createChildLbState(Object key, Object policyConfig,
100
      SubchannelPicker initialPicker, ResolvedAddresses unused) {
101
    ChildLbState childLbState = new WeightedChildLbState(key, pickFirstLbProvider, policyConfig,
1✔
102
        initialPicker);
103
    return childLbState;
1✔
104
  }
105

106
  @Override
107
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
108
    if (resolvedAddresses.getLoadBalancingPolicyConfig() == null) {
1✔
109
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
110
              "NameResolver returned no WeightedRoundRobinLoadBalancerConfig. addrs="
111
                      + resolvedAddresses.getAddresses()
1✔
112
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
113
      handleNameResolutionError(unavailableStatus);
1✔
114
      return unavailableStatus;
1✔
115
    }
116
    config =
1✔
117
            (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
118
    AcceptResolvedAddressRetVal acceptRetVal;
119
    try {
120
      resolvingAddresses = true;
1✔
121
      acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
1✔
122
      if (!acceptRetVal.status.isOk()) {
1✔
123
        return acceptRetVal.status;
×
124
      }
125

126
      if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
1✔
127
        weightUpdateTimer.cancel();
1✔
128
      }
129
      updateWeightTask.run();
1✔
130

131
      createAndApplyOrcaListeners();
1✔
132

133
      // Must update channel picker before return so that new RPCs will not be routed to deleted
134
      // clusters and resolver can remove them in service config.
135
      updateOverallBalancingState();
1✔
136

137
      shutdownRemoved(acceptRetVal.removedChildren);
1✔
138
    } finally {
139
      resolvingAddresses = false;
1✔
140
    }
141

142
    return acceptRetVal.status;
1✔
143
  }
144

145
  @Override
146
  public RoundRobinPicker createReadyPicker(Collection<ChildLbState> activeList) {
147
    return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
1✔
148
        config.enableOobLoadReport, config.errorUtilizationPenalty);
149
  }
150

151
  // Expose for tests in this package.
152
  @Override
153
  protected ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
154
    return super.getChildLbStateEag(eag);
1✔
155
  }
156

157
  @VisibleForTesting
158
  final class WeightedChildLbState extends ChildLbState {
159

160
    private final Set<WrrSubchannel> subchannels = new HashSet<>();
1✔
161
    private volatile long lastUpdated;
162
    private volatile long nonEmptySince;
163
    private volatile double weight = 0;
1✔
164

165
    private OrcaReportListener orcaReportListener;
166

167
    public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider, Object childConfig,
168
        SubchannelPicker initialPicker) {
1✔
169
      super(key, policyProvider, childConfig, initialPicker);
1✔
170
    }
1✔
171

172
    private double getWeight() {
173
      if (config == null) {
1✔
174
        return 0;
×
175
      }
176
      long now = ticker.nanoTime();
1✔
177
      if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
1✔
178
        nonEmptySince = infTime;
1✔
179
        return 0;
1✔
180
      } else if (now - nonEmptySince < config.blackoutPeriodNanos
1✔
181
          && config.blackoutPeriodNanos > 0) {
1✔
182
        return 0;
1✔
183
      } else {
184
        return weight;
1✔
185
      }
186
    }
187

188
    public void addSubchannel(WrrSubchannel wrrSubchannel) {
189
      subchannels.add(wrrSubchannel);
1✔
190
    }
1✔
191

192
    public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty) {
193
      if (orcaReportListener != null
1✔
194
          && orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty) {
1✔
195
        return orcaReportListener;
1✔
196
      }
197
      orcaReportListener = new OrcaReportListener(errorUtilizationPenalty);
1✔
198
      return orcaReportListener;
1✔
199
    }
200

201
    public void removeSubchannel(WrrSubchannel wrrSubchannel) {
202
      subchannels.remove(wrrSubchannel);
1✔
203
    }
1✔
204

205
    final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
206
      private final float errorUtilizationPenalty;
207

208
      OrcaReportListener(float errorUtilizationPenalty) {
1✔
209
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
210
      }
1✔
211

212
      @Override
213
      public void onLoadReport(MetricReport report) {
214
        double newWeight = 0;
1✔
215
        // Prefer application utilization and fallback to CPU utilization if unset.
216
        double utilization =
217
            report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
1✔
218
                : report.getCpuUtilization();
1✔
219
        if (utilization > 0 && report.getQps() > 0) {
1✔
220
          double penalty = 0;
1✔
221
          if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
1✔
222
            penalty = report.getEps() / report.getQps() * errorUtilizationPenalty;
1✔
223
          }
224
          newWeight = report.getQps() / (utilization + penalty);
1✔
225
        }
226
        if (newWeight == 0) {
1✔
227
          return;
1✔
228
        }
229
        if (nonEmptySince == infTime) {
1✔
230
          nonEmptySince = ticker.nanoTime();
1✔
231
        }
232
        lastUpdated = ticker.nanoTime();
1✔
233
        weight = newWeight;
1✔
234
      }
1✔
235
    }
236
  }
237

238
  private final class UpdateWeightTask implements Runnable {
1✔
239
    @Override
240
    public void run() {
241
      if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker) {
1✔
242
        ((WeightedRoundRobinPicker) currentPicker).updateWeight();
1✔
243
      }
244
      weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
1✔
245
          TimeUnit.NANOSECONDS, timeService);
1✔
246
    }
1✔
247
  }
248

249
  private void createAndApplyOrcaListeners() {
250
    for (ChildLbState child : getChildLbStates()) {
1✔
251
      WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
252
      for (WrrSubchannel weightedSubchannel : wChild.subchannels) {
1✔
253
        if (config.enableOobLoadReport) {
1✔
254
          OrcaOobUtil.setListener(weightedSubchannel,
1✔
255
              wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty),
1✔
256
              OrcaOobUtil.OrcaReportingConfig.newBuilder()
1✔
257
                  .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
1✔
258
                  .build());
1✔
259
        } else {
260
          OrcaOobUtil.setListener(weightedSubchannel, null, null);
1✔
261
        }
262
      }
1✔
263
    }
1✔
264
  }
1✔
265

266
  @Override
267
  public void shutdown() {
268
    if (weightUpdateTimer != null) {
1✔
269
      weightUpdateTimer.cancel();
1✔
270
    }
271
    super.shutdown();
1✔
272
  }
1✔
273

274
  private static final class WrrHelper extends ForwardingLoadBalancerHelper {
275
    private final Helper delegate;
276
    private WeightedRoundRobinLoadBalancer wrr;
277

278
    WrrHelper(Helper helper) {
1✔
279
      this.delegate = helper;
1✔
280
    }
1✔
281

282
    void setLoadBalancer(WeightedRoundRobinLoadBalancer lb) {
283
      this.wrr = lb;
1✔
284
    }
1✔
285

286
    @Override
287
    protected Helper delegate() {
288
      return delegate;
1✔
289
    }
290

291
    @Override
292
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
293
      checkElementIndex(0, args.getAddresses().size(), "Empty address group");
1✔
294
      WeightedChildLbState childLbState =
1✔
295
          (WeightedChildLbState) wrr.getChildLbStateEag(args.getAddresses().get(0));
1✔
296
      return wrr.new WrrSubchannel(delegate().createSubchannel(args), childLbState);
1✔
297
    }
298
  }
299

300
  @VisibleForTesting
301
  final class WrrSubchannel extends ForwardingSubchannel {
302
    private final Subchannel delegate;
303
    private final WeightedChildLbState owner;
304

305
    WrrSubchannel(Subchannel delegate, WeightedChildLbState owner) {
1✔
306
      this.delegate = checkNotNull(delegate, "delegate");
1✔
307
      this.owner = checkNotNull(owner, "owner");
1✔
308
    }
1✔
309

310
    @Override
311
    public void start(SubchannelStateListener listener) {
312
      owner.addSubchannel(this);
1✔
313
      delegate().start(new SubchannelStateListener() {
1✔
314
        @Override
315
        public void onSubchannelState(ConnectivityStateInfo newState) {
316
          if (newState.getState().equals(ConnectivityState.READY)) {
1✔
317
            owner.nonEmptySince = infTime;
1✔
318
          }
319
          listener.onSubchannelState(newState);
1✔
320
        }
1✔
321
      });
322
    }
1✔
323

324
    @Override
325
    protected Subchannel delegate() {
326
      return delegate;
1✔
327
    }
328

329
    @Override
330
    public void shutdown() {
331
      super.shutdown();
1✔
332
      owner.removeSubchannel(this);
1✔
333
    }
1✔
334
  }
335

336
  @VisibleForTesting
337
  final class WeightedRoundRobinPicker extends RoundRobinPicker {
338
    private final List<ChildLbState> children;
339
    private final Map<Subchannel, OrcaPerRequestReportListener> subchannelToReportListenerMap =
1✔
340
        new HashMap<>();
341
    private final boolean enableOobLoadReport;
342
    private final float errorUtilizationPenalty;
343
    private volatile StaticStrideScheduler scheduler;
344

345
    WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
346
        float errorUtilizationPenalty) {
1✔
347
      checkNotNull(children, "children");
1✔
348
      Preconditions.checkArgument(!children.isEmpty(), "empty child list");
1✔
349
      this.children = children;
1✔
350
      for (ChildLbState child : children) {
1✔
351
        WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
352
        for (WrrSubchannel subchannel : wChild.subchannels) {
1✔
353
          this.subchannelToReportListenerMap
1✔
354
              .put(subchannel, wChild.getOrCreateOrcaListener(errorUtilizationPenalty));
1✔
355
        }
1✔
356
      }
1✔
357
      this.enableOobLoadReport = enableOobLoadReport;
1✔
358
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
359
      updateWeight();
1✔
360
    }
1✔
361

362
    @Override
363
    public PickResult pickSubchannel(PickSubchannelArgs args) {
364
      ChildLbState childLbState = children.get(scheduler.pick());
1✔
365
      WeightedChildLbState wChild = (WeightedChildLbState) childLbState;
1✔
366
      PickResult pickResult = childLbState.getCurrentPicker().pickSubchannel(args);
1✔
367
      Subchannel subchannel = pickResult.getSubchannel();
1✔
368
      if (!enableOobLoadReport) {
1✔
369
        return PickResult.withSubchannel(subchannel,
1✔
370
            OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
1✔
371
                subchannelToReportListenerMap.getOrDefault(subchannel,
1✔
372
                    wChild.getOrCreateOrcaListener(errorUtilizationPenalty))));
1✔
373
      } else {
374
        return PickResult.withSubchannel(subchannel);
1✔
375
      }
376
    }
377

378
    private void updateWeight() {
379
      float[] newWeights = new float[children.size()];
1✔
380
      for (int i = 0; i < children.size(); i++) {
1✔
381
        double newWeight = ((WeightedChildLbState)children.get(i)).getWeight();
1✔
382
        newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
1✔
383
      }
384
      this.scheduler = new StaticStrideScheduler(newWeights, sequence);
1✔
385
    }
1✔
386

387
    @Override
388
    public String toString() {
389
      return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
1✔
390
          .add("enableOobLoadReport", enableOobLoadReport)
1✔
391
          .add("errorUtilizationPenalty", errorUtilizationPenalty)
1✔
392
          .add("list", children).toString();
1✔
393
    }
394

395
    @VisibleForTesting
396
    List<ChildLbState> getChildren() {
397
      return children;
1✔
398
    }
399

400
    @Override
401
    public boolean isEquivalentTo(RoundRobinPicker picker) {
402
      if (!(picker instanceof WeightedRoundRobinPicker)) {
1✔
403
        return false;
×
404
      }
405
      WeightedRoundRobinPicker other = (WeightedRoundRobinPicker) picker;
1✔
406
      if (other == this) {
1✔
407
        return true;
×
408
      }
409
      // the lists cannot contain duplicate subchannels
410
      return enableOobLoadReport == other.enableOobLoadReport
1✔
411
          && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
1✔
412
          && children.size() == other.children.size() && new HashSet<>(
1✔
413
          children).containsAll(other.children);
1✔
414
    }
415
  }
416

417
  /*
418
   * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
419
   * in which each object's deadline is the multiplicative inverse of the object's weight.
420
   * <p>
421
   * The way in which this is implemented is through a static stride scheduler. 
422
   * The Static Stride Scheduler works by iterating through the list of subchannel weights
423
   * and using modular arithmetic to proportionally distribute picks, favoring entries 
424
   * with higher weights. It is based on the observation that the intended sequence generated 
425
   * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic. 
426
   * The Static Stride Scheduler is more performant than other implementations of the EDF
427
   * Scheduler, as it removes the need for a priority queue (and thus mutex locks).
428
   * <p>
429
   * go/static-stride-scheduler
430
   * <p>
431
   *
432
   * <ul>
433
   *  <li>nextSequence() - O(1)
434
   *  <li>pick() - O(n)
435
   */
436
  @VisibleForTesting
437
  static final class StaticStrideScheduler {
438
    private final short[] scaledWeights;
439
    private final AtomicInteger sequence;
440
    private static final int K_MAX_WEIGHT = 0xFFFF;
441

442
    // Assuming the mean of all known weights is M, StaticStrideScheduler will clamp
443
    // weights bigger than M*kMaxRatio and weights smaller than M*kMinRatio.
444
    //
445
    // This is done as a performance optimization by limiting the number of rounds for picks
446
    // for edge cases where channels have large differences in subchannel weights.
447
    // In this case, without these clips, it would potentially require the scheduler to
448
    // frequently traverse through the entire subchannel list within the pick method.
449
    //
450
    // The current values of 10 and 0.1 were chosen without any experimenting. It should
451
    // decrease the amount of sequences that the scheduler must traverse through in order
452
    // to pick a high weight subchannel in such corner cases.
453
    // But, it also makes WeightedRoundRobin to send slightly more requests to
454
    // potentially very bad tasks (that would have near-zero weights) than zero.
455
    // This is not necessarily a downside, though. Perhaps this is not a problem at
456
    // all, and we can increase this value if needed to save CPU cycles.
457
    private static final double K_MAX_RATIO = 10;
458
    private static final double K_MIN_RATIO = 0.1;
459

460
    StaticStrideScheduler(float[] weights, AtomicInteger sequence) {
1✔
461
      checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
1✔
462
      int numChannels = weights.length;
1✔
463
      int numWeightedChannels = 0;
1✔
464
      double sumWeight = 0;
1✔
465
      double unscaledMeanWeight;
466
      float unscaledMaxWeight = 0;
1✔
467
      for (float weight : weights) {
1✔
468
        if (weight > 0) {
1✔
469
          sumWeight += weight;
1✔
470
          unscaledMaxWeight = Math.max(weight, unscaledMaxWeight);
1✔
471
          numWeightedChannels++;
1✔
472
        }
473
      }
474

475
      // Adjust max value s.t. ratio does not exceed K_MAX_RATIO. This should
476
      // ensure that we on average do at most K_MAX_RATIO rounds for picks.
477
      if (numWeightedChannels > 0) {
1✔
478
        unscaledMeanWeight = sumWeight / numWeightedChannels;
1✔
479
        unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));
1✔
480
      } else {
481
        // Fall back to round robin if all values are non-positives
482
        unscaledMeanWeight = 1;
1✔
483
        unscaledMaxWeight = 1;
1✔
484
      }
485

486
      // Scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly.
487
      // Note that, since we cap the weights to stay within K_MAX_RATIO, meanWeight might not
488
      // match the actual mean of the values that end up in the scheduler.
489
      double scalingFactor = K_MAX_WEIGHT / unscaledMaxWeight;
1✔
490
      // We compute weightLowerBound and clamp it to 1 from below so that in the
491
      // worst case, we represent tiny weights as 1.
492
      int weightLowerBound = (int) Math.ceil(scalingFactor * unscaledMeanWeight * K_MIN_RATIO);
1✔
493
      short[] scaledWeights = new short[numChannels];
1✔
494
      for (int i = 0; i < numChannels; i++) {
1✔
495
        if (weights[i] <= 0) {
1✔
496
          scaledWeights[i] = (short) Math.round(scalingFactor * unscaledMeanWeight);
1✔
497
        } else {
498
          int weight = (int) Math.round(scalingFactor * Math.min(weights[i], unscaledMaxWeight));
1✔
499
          scaledWeights[i] = (short) Math.max(weight, weightLowerBound);
1✔
500
        }
501
      }
502

503
      this.scaledWeights = scaledWeights;
1✔
504
      this.sequence = sequence;
1✔
505
    }
1✔
506

507
    /** Returns the next sequence number and atomically increases sequence with wraparound. */
508
    private long nextSequence() {
509
      return Integer.toUnsignedLong(sequence.getAndIncrement());
1✔
510
    }
511

512
    /*
513
     * Selects index of next backend server.
514
     * <p>
515
     * A 2D array is compactly represented as a function of W(backend), where the row
516
     * represents the generation and the column represents the backend index:
517
     * X(backend,generation) | generation ∈ [0,kMaxWeight).
518
     * Each element in the conceptual array is a boolean indicating whether the backend at
519
     * this index should be picked now. If false, the counter is incremented again,
520
     * and the new element is checked. An atomically incremented counter keeps track of our
521
     * backend and generation through modular arithmetic within the pick() method.
522
     * <p>
523
     * Modular arithmetic allows us to evenly distribute picks and skips between
524
     * generations based on W(backend).
525
     * X(backend,generation) = (W(backend) * generation) % kMaxWeight >= kMaxWeight - W(backend)
526
     * If we have the same three backends with weights:
527
     * W(backend) = {2,3,6} scaled to max(W(backend)) = 6, then X(backend,generation) is:
528
     * <p>
529
     * B0    B1    B2
530
     * T     T     T
531
     * F     F     T
532
     * F     T     T
533
     * T     F     T
534
     * F     T     T
535
     * F     F     T
536
     * The sequence of picked backend indices is given by
537
     * walking across and down: {0,1,2,2,1,2,0,2,1,2,2}.
538
     * <p>
539
     * To reduce the variance and spread the wasted work among different picks,
540
     * an offset that varies per backend index is also included to the calculation.
541
     */
542
    int pick() {
543
      while (true) {
544
        long sequence = this.nextSequence();
1✔
545
        int backendIndex = (int) (sequence % scaledWeights.length);
1✔
546
        long generation = sequence / scaledWeights.length;
1✔
547
        int weight = Short.toUnsignedInt(scaledWeights[backendIndex]);
1✔
548
        long offset = (long) K_MAX_WEIGHT / 2 * backendIndex;
1✔
549
        if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
1✔
550
          continue;
1✔
551
        }
552
        return backendIndex;
1✔
553
      }
554
    }
555
  }
556

557
  static final class WeightedRoundRobinLoadBalancerConfig {
558
    final long blackoutPeriodNanos;
559
    final long weightExpirationPeriodNanos;
560
    final boolean enableOobLoadReport;
561
    final long oobReportingPeriodNanos;
562
    final long weightUpdatePeriodNanos;
563
    final float errorUtilizationPenalty;
564

565
    public static Builder newBuilder() {
566
      return new Builder();
1✔
567
    }
568

569
    private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
570
                                                 long weightExpirationPeriodNanos,
571
                                                 boolean enableOobLoadReport,
572
                                                 long oobReportingPeriodNanos,
573
                                                 long weightUpdatePeriodNanos,
574
                                                 float errorUtilizationPenalty) {
1✔
575
      this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
576
      this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
577
      this.enableOobLoadReport = enableOobLoadReport;
1✔
578
      this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
579
      this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
580
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
581
    }
1✔
582

583
    static final class Builder {
584
      long blackoutPeriodNanos = 10_000_000_000L; // 10s
1✔
585
      long weightExpirationPeriodNanos = 180_000_000_000L; //3min
1✔
586
      boolean enableOobLoadReport = false;
1✔
587
      long oobReportingPeriodNanos = 10_000_000_000L; // 10s
1✔
588
      long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
1✔
589
      float errorUtilizationPenalty = 1.0F;
1✔
590

591
      private Builder() {
1✔
592

593
      }
1✔
594

595
      @SuppressWarnings("UnusedReturnValue")
596
      Builder setBlackoutPeriodNanos(long blackoutPeriodNanos) {
597
        this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
598
        return this;
1✔
599
      }
600

601
      @SuppressWarnings("UnusedReturnValue")
602
      Builder setWeightExpirationPeriodNanos(long weightExpirationPeriodNanos) {
603
        this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
604
        return this;
1✔
605
      }
606

607
      Builder setEnableOobLoadReport(boolean enableOobLoadReport) {
608
        this.enableOobLoadReport = enableOobLoadReport;
1✔
609
        return this;
1✔
610
      }
611

612
      Builder setOobReportingPeriodNanos(long oobReportingPeriodNanos) {
613
        this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
614
        return this;
1✔
615
      }
616

617
      Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) {
618
        this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
619
        return this;
1✔
620
      }
621

622
      Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
623
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
624
        return this;
1✔
625
      }
626

627
      WeightedRoundRobinLoadBalancerConfig build() {
628
        return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
1✔
629
                weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
630
                weightUpdatePeriodNanos, errorUtilizationPenalty);
631
      }
632
    }
633
  }
634
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc