• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19219

09 May 2024 04:01PM UTC coverage: 88.408% (+0.02%) from 88.392%
#19219

push

github

ejona86
xds: Include locality label in WRR metrics (#11170)

31589 of 35731 relevant lines covered (88.41%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.32
/../xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkElementIndex;
21
import static com.google.common.base.Preconditions.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.Lists;
28
import io.grpc.ConnectivityState;
29
import io.grpc.ConnectivityStateInfo;
30
import io.grpc.Deadline.Ticker;
31
import io.grpc.DoubleHistogramMetricInstrument;
32
import io.grpc.EquivalentAddressGroup;
33
import io.grpc.ExperimentalApi;
34
import io.grpc.LoadBalancer;
35
import io.grpc.LoadBalancerProvider;
36
import io.grpc.LongCounterMetricInstrument;
37
import io.grpc.MetricInstrumentRegistry;
38
import io.grpc.NameResolver;
39
import io.grpc.Status;
40
import io.grpc.SynchronizationContext;
41
import io.grpc.SynchronizationContext.ScheduledHandle;
42
import io.grpc.services.MetricReport;
43
import io.grpc.util.ForwardingLoadBalancerHelper;
44
import io.grpc.util.ForwardingSubchannel;
45
import io.grpc.util.RoundRobinLoadBalancer;
46
import io.grpc.xds.orca.OrcaOobUtil;
47
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
48
import io.grpc.xds.orca.OrcaPerRequestUtil;
49
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
50
import java.util.Collection;
51
import java.util.HashMap;
52
import java.util.HashSet;
53
import java.util.List;
54
import java.util.Map;
55
import java.util.Random;
56
import java.util.Set;
57
import java.util.concurrent.ScheduledExecutorService;
58
import java.util.concurrent.TimeUnit;
59
import java.util.concurrent.atomic.AtomicInteger;
60
import java.util.logging.Level;
61
import java.util.logging.Logger;
62

63
/**
64
 * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over the
65
 * {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
66
 * determined by backend metrics using ORCA.
67
 */
68
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9885")
69
final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
70

71
  private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
72
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
73
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;
74
  private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;
75
  private static final Logger log = Logger.getLogger(
1✔
76
      WeightedRoundRobinLoadBalancer.class.getName());
1✔
77
  private WeightedRoundRobinLoadBalancerConfig config;
78
  private final SynchronizationContext syncContext;
79
  private final ScheduledExecutorService timeService;
80
  private ScheduledHandle weightUpdateTimer;
81
  private final Runnable updateWeightTask;
82
  private final AtomicInteger sequence;
83
  private final long infTime;
84
  private final Ticker ticker;
85
  private String locality = "";
1✔
86

87
  // The metric instruments are only registered once and shared by all instances of this LB.
88
  static {
89
    MetricInstrumentRegistry metricInstrumentRegistry
90
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
91
    RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
1✔
92
        "Number of scheduler updates in which there were not enough endpoints with valid "
93
            + "weight, which caused the WRR policy to fall back to RR behavior", "update",
94
        Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), true);
1✔
95
    ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
96
        "grpc.lb.wrr.endpoint_weight_not_yet_usable",
97
        "Number of endpoints from each scheduler update that don't yet have usable weight "
98
            + "information", "endpoint", Lists.newArrayList("grpc.target"),
1✔
99
        Lists.newArrayList("grpc.lb.locality"), true);
1✔
100
    ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
101
        "grpc.lb.wrr.endpoint_weight_stale",
102
        "Number of endpoints from each scheduler update whose latest weight is older than the "
103
            + "expiration period", "endpoint", Lists.newArrayList("grpc.target"),
1✔
104
        Lists.newArrayList("grpc.lb.locality"), true);
1✔
105
    ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
1✔
106
        "grpc.lb.wrr.endpoint_weights", "The histogram buckets will be endpoint weight ranges.",
107
        "weight", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
1✔
108
        Lists.newArrayList("grpc.lb.locality"),
1✔
109
        true);
110
  }
1✔
111

112
  public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
113
    this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random());
1✔
114
  }
1✔
115

116
  public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker, Random random) {
117
    super(helper);
1✔
118
    helper.setLoadBalancer(this);
1✔
119
    this.ticker = checkNotNull(ticker, "ticker");
1✔
120
    this.infTime = ticker.nanoTime() + Long.MAX_VALUE;
1✔
121
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
122
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
123
    this.updateWeightTask = new UpdateWeightTask();
1✔
124
    this.sequence = new AtomicInteger(random.nextInt());
1✔
125
    log.log(Level.FINE, "weighted_round_robin LB created");
1✔
126
  }
1✔
127

128
  @VisibleForTesting
129
  WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
130
    this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, random);
1✔
131
  }
1✔
132

133
  @Override
134
  protected ChildLbState createChildLbState(Object key, Object policyConfig,
135
      SubchannelPicker initialPicker, ResolvedAddresses unused) {
136
    ChildLbState childLbState = new WeightedChildLbState(key, pickFirstLbProvider, policyConfig,
1✔
137
        initialPicker);
138
    return childLbState;
1✔
139
  }
140

141
  @Override
142
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
143
    if (resolvedAddresses.getLoadBalancingPolicyConfig() == null) {
1✔
144
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
145
              "NameResolver returned no WeightedRoundRobinLoadBalancerConfig. addrs="
146
                      + resolvedAddresses.getAddresses()
1✔
147
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
148
      handleNameResolutionError(unavailableStatus);
1✔
149
      return unavailableStatus;
1✔
150
    }
151
    String locality = resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME);
1✔
152
    if (locality != null) {
1✔
153
      this.locality = locality;
1✔
154
    } else {
155
      this.locality = "";
1✔
156
    }
157
    config =
1✔
158
            (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
159
    AcceptResolvedAddrRetVal acceptRetVal;
160
    try {
161
      resolvingAddresses = true;
1✔
162
      acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
1✔
163
      if (!acceptRetVal.status.isOk()) {
1✔
164
        return acceptRetVal.status;
×
165
      }
166

167
      if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
1✔
168
        weightUpdateTimer.cancel();
1✔
169
      }
170
      updateWeightTask.run();
1✔
171

172
      createAndApplyOrcaListeners();
1✔
173

174
      // Must update channel picker before return so that new RPCs will not be routed to deleted
175
      // clusters and resolver can remove them in service config.
176
      updateOverallBalancingState();
1✔
177

178
      shutdownRemoved(acceptRetVal.removedChildren);
1✔
179
    } finally {
180
      resolvingAddresses = false;
1✔
181
    }
182

183
    return acceptRetVal.status;
1✔
184
  }
185

186
  @Override
187
  public SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
188
    return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
1✔
189
        config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper(),
1✔
190
        locality);
191
  }
192

193
  @VisibleForTesting
194
  final class WeightedChildLbState extends ChildLbState {
195

196
    private final Set<WrrSubchannel> subchannels = new HashSet<>();
1✔
197
    private volatile long lastUpdated;
198
    private volatile long nonEmptySince;
199
    private volatile double weight = 0;
1✔
200

201
    private OrcaReportListener orcaReportListener;
202

203
    public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider, Object childConfig,
204
        SubchannelPicker initialPicker) {
1✔
205
      super(key, policyProvider, childConfig, initialPicker);
1✔
206
    }
1✔
207

208
    private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) {
209
      if (config == null) {
1✔
210
        return 0;
×
211
      }
212
      long now = ticker.nanoTime();
1✔
213
      if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
1✔
214
        nonEmptySince = infTime;
1✔
215
        staleEndpoints.incrementAndGet();
1✔
216
        return 0;
1✔
217
      } else if (now - nonEmptySince < config.blackoutPeriodNanos
1✔
218
          && config.blackoutPeriodNanos > 0) {
1✔
219
        notYetUsableEndpoints.incrementAndGet();
1✔
220
        return 0;
1✔
221
      } else {
222
        return weight;
1✔
223
      }
224
    }
225

226
    public void addSubchannel(WrrSubchannel wrrSubchannel) {
227
      subchannels.add(wrrSubchannel);
1✔
228
    }
1✔
229

230
    public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty) {
231
      if (orcaReportListener != null
1✔
232
          && orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty) {
1✔
233
        return orcaReportListener;
1✔
234
      }
235
      orcaReportListener = new OrcaReportListener(errorUtilizationPenalty);
1✔
236
      return orcaReportListener;
1✔
237
    }
238

239
    public void removeSubchannel(WrrSubchannel wrrSubchannel) {
240
      subchannels.remove(wrrSubchannel);
1✔
241
    }
1✔
242

243
    final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
244
      private final float errorUtilizationPenalty;
245

246
      OrcaReportListener(float errorUtilizationPenalty) {
1✔
247
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
248
      }
1✔
249

250
      @Override
251
      public void onLoadReport(MetricReport report) {
252
        double newWeight = 0;
1✔
253
        // Prefer application utilization and fallback to CPU utilization if unset.
254
        double utilization =
255
            report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
1✔
256
                : report.getCpuUtilization();
1✔
257
        if (utilization > 0 && report.getQps() > 0) {
1✔
258
          double penalty = 0;
1✔
259
          if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
1✔
260
            penalty = report.getEps() / report.getQps() * errorUtilizationPenalty;
1✔
261
          }
262
          newWeight = report.getQps() / (utilization + penalty);
1✔
263
        }
264
        if (newWeight == 0) {
1✔
265
          return;
1✔
266
        }
267
        if (nonEmptySince == infTime) {
1✔
268
          nonEmptySince = ticker.nanoTime();
1✔
269
        }
270
        lastUpdated = ticker.nanoTime();
1✔
271
        weight = newWeight;
1✔
272
      }
1✔
273
    }
274
  }
275

276
  private final class UpdateWeightTask implements Runnable {
1✔
277
    @Override
278
    public void run() {
279
      if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker) {
1✔
280
        ((WeightedRoundRobinPicker) currentPicker).updateWeight();
1✔
281
      }
282
      weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
1✔
283
          TimeUnit.NANOSECONDS, timeService);
1✔
284
    }
1✔
285
  }
286

287
  private void createAndApplyOrcaListeners() {
288
    for (ChildLbState child : getChildLbStates()) {
1✔
289
      WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
290
      for (WrrSubchannel weightedSubchannel : wChild.subchannels) {
1✔
291
        if (config.enableOobLoadReport) {
1✔
292
          OrcaOobUtil.setListener(weightedSubchannel,
1✔
293
              wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty),
1✔
294
              OrcaOobUtil.OrcaReportingConfig.newBuilder()
1✔
295
                  .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
1✔
296
                  .build());
1✔
297
        } else {
298
          OrcaOobUtil.setListener(weightedSubchannel, null, null);
1✔
299
        }
300
      }
1✔
301
    }
1✔
302
  }
1✔
303

304
  @Override
305
  public void shutdown() {
306
    if (weightUpdateTimer != null) {
1✔
307
      weightUpdateTimer.cancel();
1✔
308
    }
309
    super.shutdown();
1✔
310
  }
1✔
311

312
  private static final class WrrHelper extends ForwardingLoadBalancerHelper {
313
    private final Helper delegate;
314
    private WeightedRoundRobinLoadBalancer wrr;
315

316
    WrrHelper(Helper helper) {
1✔
317
      this.delegate = helper;
1✔
318
    }
1✔
319

320
    void setLoadBalancer(WeightedRoundRobinLoadBalancer lb) {
321
      this.wrr = lb;
1✔
322
    }
1✔
323

324
    @Override
325
    protected Helper delegate() {
326
      return delegate;
1✔
327
    }
328

329
    @Override
330
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
331
      checkElementIndex(0, args.getAddresses().size(), "Empty address group");
1✔
332
      WeightedChildLbState childLbState =
1✔
333
          (WeightedChildLbState) wrr.getChildLbStateEag(args.getAddresses().get(0));
1✔
334
      return wrr.new WrrSubchannel(delegate().createSubchannel(args), childLbState);
1✔
335
    }
336
  }
337

338
  @VisibleForTesting
339
  final class WrrSubchannel extends ForwardingSubchannel {
340
    private final Subchannel delegate;
341
    private final WeightedChildLbState owner;
342

343
    WrrSubchannel(Subchannel delegate, WeightedChildLbState owner) {
1✔
344
      this.delegate = checkNotNull(delegate, "delegate");
1✔
345
      this.owner = checkNotNull(owner, "owner");
1✔
346
    }
1✔
347

348
    @Override
349
    public void start(SubchannelStateListener listener) {
350
      owner.addSubchannel(this);
1✔
351
      delegate().start(new SubchannelStateListener() {
1✔
352
        @Override
353
        public void onSubchannelState(ConnectivityStateInfo newState) {
354
          if (newState.getState().equals(ConnectivityState.READY)) {
1✔
355
            owner.nonEmptySince = infTime;
1✔
356
          }
357
          listener.onSubchannelState(newState);
1✔
358
        }
1✔
359
      });
360
    }
1✔
361

362
    @Override
363
    protected Subchannel delegate() {
364
      return delegate;
1✔
365
    }
366

367
    @Override
368
    public void shutdown() {
369
      super.shutdown();
1✔
370
      owner.removeSubchannel(this);
1✔
371
    }
1✔
372
  }
373

374
  @VisibleForTesting
375
  static final class WeightedRoundRobinPicker extends SubchannelPicker {
376
    private final List<ChildLbState> children;
377
    private final Map<Subchannel, OrcaPerRequestReportListener> subchannelToReportListenerMap =
1✔
378
        new HashMap<>();
379
    private final boolean enableOobLoadReport;
380
    private final float errorUtilizationPenalty;
381
    private final AtomicInteger sequence;
382
    private final int hashCode;
383
    private final LoadBalancer.Helper helper;
384
    private final String locality;
385
    private volatile StaticStrideScheduler scheduler;
386

387
    WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
388
        float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper,
389
        String locality) {
1✔
390
      checkNotNull(children, "children");
1✔
391
      Preconditions.checkArgument(!children.isEmpty(), "empty child list");
1✔
392
      this.children = children;
1✔
393
      for (ChildLbState child : children) {
1✔
394
        WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
395
        for (WrrSubchannel subchannel : wChild.subchannels) {
1✔
396
          this.subchannelToReportListenerMap
1✔
397
              .put(subchannel, wChild.getOrCreateOrcaListener(errorUtilizationPenalty));
1✔
398
        }
1✔
399
      }
1✔
400
      this.enableOobLoadReport = enableOobLoadReport;
1✔
401
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
402
      this.sequence = checkNotNull(sequence, "sequence");
1✔
403
      this.helper = helper;
1✔
404
      this.locality = checkNotNull(locality, "locality");
1✔
405

406
      // For equality we treat children as a set; use hash code as defined by Set
407
      int sum = 0;
1✔
408
      for (ChildLbState child : children) {
1✔
409
        sum += child.hashCode();
1✔
410
      }
1✔
411
      this.hashCode = sum
1✔
412
          ^ Boolean.hashCode(enableOobLoadReport)
1✔
413
          ^ Float.hashCode(errorUtilizationPenalty);
1✔
414

415
      updateWeight();
1✔
416
    }
1✔
417

418
    @Override
419
    public PickResult pickSubchannel(PickSubchannelArgs args) {
420
      ChildLbState childLbState = children.get(scheduler.pick());
1✔
421
      WeightedChildLbState wChild = (WeightedChildLbState) childLbState;
1✔
422
      PickResult pickResult = childLbState.getCurrentPicker().pickSubchannel(args);
1✔
423
      Subchannel subchannel = pickResult.getSubchannel();
1✔
424
      if (subchannel == null) {
1✔
425
        return pickResult;
1✔
426
      }
427
      if (!enableOobLoadReport) {
1✔
428
        return PickResult.withSubchannel(subchannel,
1✔
429
            OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
1✔
430
                subchannelToReportListenerMap.getOrDefault(subchannel,
1✔
431
                    wChild.getOrCreateOrcaListener(errorUtilizationPenalty))));
1✔
432
      } else {
433
        return PickResult.withSubchannel(subchannel);
1✔
434
      }
435
    }
436

437
    private void updateWeight() {
438
      float[] newWeights = new float[children.size()];
1✔
439
      AtomicInteger staleEndpoints = new AtomicInteger();
1✔
440
      AtomicInteger notYetUsableEndpoints = new AtomicInteger();
1✔
441
      for (int i = 0; i < children.size(); i++) {
1✔
442
        double newWeight = ((WeightedChildLbState) children.get(i)).getWeight(staleEndpoints,
1✔
443
            notYetUsableEndpoints);
444
        // TODO: add locality label once available
445
        helper.getMetricRecorder()
1✔
446
            .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
1✔
447
                ImmutableList.of(helper.getChannelTarget()),
1✔
448
                ImmutableList.of(locality));
1✔
449
        newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
1✔
450
      }
451
      if (staleEndpoints.get() > 0) {
1✔
452
        // TODO: add locality label once available
453
        helper.getMetricRecorder()
1✔
454
            .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
1✔
455
                ImmutableList.of(helper.getChannelTarget()),
1✔
456
                ImmutableList.of(locality));
1✔
457
      }
458
      if (notYetUsableEndpoints.get() > 0) {
1✔
459
        // TODO: add locality label once available
460
        helper.getMetricRecorder()
1✔
461
            .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
1✔
462
                ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
1✔
463
      }
464

465
      this.scheduler = new StaticStrideScheduler(newWeights, sequence);
1✔
466
      if (this.scheduler.usesRoundRobin()) {
1✔
467
        // TODO: locality label once available
468
        helper.getMetricRecorder()
1✔
469
            .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
1✔
470
                ImmutableList.of(locality));
1✔
471
      }
472
    }
1✔
473

474
    @Override
475
    public String toString() {
476
      return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
1✔
477
          .add("enableOobLoadReport", enableOobLoadReport)
1✔
478
          .add("errorUtilizationPenalty", errorUtilizationPenalty)
1✔
479
          .add("list", children).toString();
1✔
480
    }
481

482
    @VisibleForTesting
483
    List<ChildLbState> getChildren() {
484
      return children;
1✔
485
    }
486

487
    @Override
488
    public int hashCode() {
489
      return hashCode;
×
490
    }
491

492
    @Override
493
    public boolean equals(Object o) {
494
      if (!(o instanceof WeightedRoundRobinPicker)) {
1✔
495
        return false;
×
496
      }
497
      WeightedRoundRobinPicker other = (WeightedRoundRobinPicker) o;
1✔
498
      if (other == this) {
1✔
499
        return true;
×
500
      }
501
      // the lists cannot contain duplicate subchannels
502
      return hashCode == other.hashCode
1✔
503
          && sequence == other.sequence
504
          && enableOobLoadReport == other.enableOobLoadReport
505
          && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
1✔
506
          && children.size() == other.children.size()
1✔
507
          && new HashSet<>(children).containsAll(other.children);
1✔
508
    }
509
  }
510

511
  /*
512
   * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
513
   * in which each object's deadline is the multiplicative inverse of the object's weight.
514
   * <p>
515
   * The way in which this is implemented is through a static stride scheduler. 
516
   * The Static Stride Scheduler works by iterating through the list of subchannel weights
517
   * and using modular arithmetic to proportionally distribute picks, favoring entries 
518
   * with higher weights. It is based on the observation that the intended sequence generated 
519
   * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic. 
520
   * The Static Stride Scheduler is more performant than other implementations of the EDF
521
   * Scheduler, as it removes the need for a priority queue (and thus mutex locks).
522
   * <p>
523
   * go/static-stride-scheduler
524
   * <p>
525
   *
526
   * <ul>
527
   *  <li>nextSequence() - O(1)
528
   *  <li>pick() - O(n)
529
   */
530
  @VisibleForTesting
531
  static final class StaticStrideScheduler {
532
    private final short[] scaledWeights;
533
    private final AtomicInteger sequence;
534
    private final boolean usesRoundRobin;
535
    private static final int K_MAX_WEIGHT = 0xFFFF;
536

537
    // Assuming the mean of all known weights is M, StaticStrideScheduler will clamp
538
    // weights bigger than M*kMaxRatio and weights smaller than M*kMinRatio.
539
    //
540
    // This is done as a performance optimization by limiting the number of rounds for picks
541
    // for edge cases where channels have large differences in subchannel weights.
542
    // In this case, without these clips, it would potentially require the scheduler to
543
    // frequently traverse through the entire subchannel list within the pick method.
544
    //
545
    // The current values of 10 and 0.1 were chosen without any experimenting. It should
546
    // decrease the amount of sequences that the scheduler must traverse through in order
547
    // to pick a high weight subchannel in such corner cases.
548
    // But, it also makes WeightedRoundRobin to send slightly more requests to
549
    // potentially very bad tasks (that would have near-zero weights) than zero.
550
    // This is not necessarily a downside, though. Perhaps this is not a problem at
551
    // all, and we can increase this value if needed to save CPU cycles.
552
    private static final double K_MAX_RATIO = 10;
553
    private static final double K_MIN_RATIO = 0.1;
554

555
    StaticStrideScheduler(float[] weights, AtomicInteger sequence) {
1✔
556
      checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
1✔
557
      int numChannels = weights.length;
1✔
558
      int numWeightedChannels = 0;
1✔
559
      double sumWeight = 0;
1✔
560
      double unscaledMeanWeight;
561
      float unscaledMaxWeight = 0;
1✔
562
      for (float weight : weights) {
1✔
563
        if (weight > 0) {
1✔
564
          sumWeight += weight;
1✔
565
          unscaledMaxWeight = Math.max(weight, unscaledMaxWeight);
1✔
566
          numWeightedChannels++;
1✔
567
        }
568
      }
569

570
      // Adjust max value s.t. ratio does not exceed K_MAX_RATIO. This should
571
      // ensure that we on average do at most K_MAX_RATIO rounds for picks.
572
      if (numWeightedChannels > 0) {
1✔
573
        unscaledMeanWeight = sumWeight / numWeightedChannels;
1✔
574
        unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));
1✔
575
        usesRoundRobin = false;
1✔
576
      } else {
577
        // Fall back to round robin if all values are non-positives
578
        usesRoundRobin = true;
1✔
579
        unscaledMeanWeight = 1;
1✔
580
        unscaledMaxWeight = 1;
1✔
581
      }
582

583
      // Scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly.
584
      // Note that, since we cap the weights to stay within K_MAX_RATIO, meanWeight might not
585
      // match the actual mean of the values that end up in the scheduler.
586
      double scalingFactor = K_MAX_WEIGHT / unscaledMaxWeight;
1✔
587
      // We compute weightLowerBound and clamp it to 1 from below so that in the
588
      // worst case, we represent tiny weights as 1.
589
      int weightLowerBound = (int) Math.ceil(scalingFactor * unscaledMeanWeight * K_MIN_RATIO);
1✔
590
      short[] scaledWeights = new short[numChannels];
1✔
591
      for (int i = 0; i < numChannels; i++) {
1✔
592
        if (weights[i] <= 0) {
1✔
593
          scaledWeights[i] = (short) Math.round(scalingFactor * unscaledMeanWeight);
1✔
594
        } else {
595
          int weight = (int) Math.round(scalingFactor * Math.min(weights[i], unscaledMaxWeight));
1✔
596
          scaledWeights[i] = (short) Math.max(weight, weightLowerBound);
1✔
597
        }
598
      }
599

600
      this.scaledWeights = scaledWeights;
1✔
601
      this.sequence = sequence;
1✔
602
    }
1✔
603

604
    // Without properly weighted channels, we do plain vanilla round_robin.
605
    boolean usesRoundRobin() {
606
      return usesRoundRobin;
1✔
607
    }
608

609
    /**
610
     * Returns the next sequence number and atomically increases sequence with wraparound.
611
     */
612
    private long nextSequence() {
613
      return Integer.toUnsignedLong(sequence.getAndIncrement());
1✔
614
    }
615

616
    /*
617
     * Selects index of next backend server.
618
     * <p>
619
     * A 2D array is compactly represented as a function of W(backend), where the row
620
     * represents the generation and the column represents the backend index:
621
     * X(backend,generation) | generation ∈ [0,kMaxWeight).
622
     * Each element in the conceptual array is a boolean indicating whether the backend at
623
     * this index should be picked now. If false, the counter is incremented again,
624
     * and the new element is checked. An atomically incremented counter keeps track of our
625
     * backend and generation through modular arithmetic within the pick() method.
626
     * <p>
627
     * Modular arithmetic allows us to evenly distribute picks and skips between
628
     * generations based on W(backend).
629
     * X(backend,generation) = (W(backend) * generation) % kMaxWeight >= kMaxWeight - W(backend)
630
     * If we have the same three backends with weights:
631
     * W(backend) = {2,3,6} scaled to max(W(backend)) = 6, then X(backend,generation) is:
632
     * <p>
633
     * B0    B1    B2
634
     * T     T     T
635
     * F     F     T
636
     * F     T     T
637
     * T     F     T
638
     * F     T     T
639
     * F     F     T
640
     * The sequence of picked backend indices is given by
641
     * walking across and down: {0,1,2,2,1,2,0,2,1,2,2}.
642
     * <p>
643
     * To reduce the variance and spread the wasted work among different picks,
644
     * an offset that varies per backend index is also included to the calculation.
645
     */
646
    int pick() {
647
      while (true) {
648
        long sequence = this.nextSequence();
1✔
649
        int backendIndex = (int) (sequence % scaledWeights.length);
1✔
650
        long generation = sequence / scaledWeights.length;
1✔
651
        int weight = Short.toUnsignedInt(scaledWeights[backendIndex]);
1✔
652
        long offset = (long) K_MAX_WEIGHT / 2 * backendIndex;
1✔
653
        if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
1✔
654
          continue;
1✔
655
        }
656
        return backendIndex;
1✔
657
      }
658
    }
659
  }
660

661
  static final class WeightedRoundRobinLoadBalancerConfig {
662
    final long blackoutPeriodNanos;
663
    final long weightExpirationPeriodNanos;
664
    final boolean enableOobLoadReport;
665
    final long oobReportingPeriodNanos;
666
    final long weightUpdatePeriodNanos;
667
    final float errorUtilizationPenalty;
668

669
    public static Builder newBuilder() {
670
      return new Builder();
1✔
671
    }
672

673
    private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
674
                                                 long weightExpirationPeriodNanos,
675
                                                 boolean enableOobLoadReport,
676
                                                 long oobReportingPeriodNanos,
677
                                                 long weightUpdatePeriodNanos,
678
                                                 float errorUtilizationPenalty) {
1✔
679
      this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
680
      this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
681
      this.enableOobLoadReport = enableOobLoadReport;
1✔
682
      this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
683
      this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
684
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
685
    }
1✔
686

687
    static final class Builder {
688
      long blackoutPeriodNanos = 10_000_000_000L; // 10s
1✔
689
      long weightExpirationPeriodNanos = 180_000_000_000L; //3min
1✔
690
      boolean enableOobLoadReport = false;
1✔
691
      long oobReportingPeriodNanos = 10_000_000_000L; // 10s
1✔
692
      long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
1✔
693
      float errorUtilizationPenalty = 1.0F;
1✔
694

695
      private Builder() {
1✔
696

697
      }
1✔
698

699
      @SuppressWarnings("UnusedReturnValue")
700
      Builder setBlackoutPeriodNanos(long blackoutPeriodNanos) {
701
        this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
702
        return this;
1✔
703
      }
704

705
      @SuppressWarnings("UnusedReturnValue")
706
      Builder setWeightExpirationPeriodNanos(long weightExpirationPeriodNanos) {
707
        this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
708
        return this;
1✔
709
      }
710

711
      Builder setEnableOobLoadReport(boolean enableOobLoadReport) {
712
        this.enableOobLoadReport = enableOobLoadReport;
1✔
713
        return this;
1✔
714
      }
715

716
      Builder setOobReportingPeriodNanos(long oobReportingPeriodNanos) {
717
        this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
718
        return this;
1✔
719
      }
720

721
      Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) {
722
        this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
723
        return this;
1✔
724
      }
725

726
      Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
727
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
728
        return this;
1✔
729
      }
730

731
      WeightedRoundRobinLoadBalancerConfig build() {
732
        return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
1✔
733
                weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
734
                weightUpdatePeriodNanos, errorUtilizationPenalty);
735
      }
736
    }
737
  }
738
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc