• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19184

01 May 2024 10:20PM UTC coverage: 88.322% (+0.003%) from 88.319%
#19184

push

github

web-flow
xds: include the target label to WRR metrics (#11141)

31478 of 35640 relevant lines covered (88.32%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.28
/../xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkElementIndex;
21
import static com.google.common.base.Preconditions.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.Lists;
28
import io.grpc.ConnectivityState;
29
import io.grpc.ConnectivityStateInfo;
30
import io.grpc.Deadline.Ticker;
31
import io.grpc.DoubleHistogramMetricInstrument;
32
import io.grpc.EquivalentAddressGroup;
33
import io.grpc.ExperimentalApi;
34
import io.grpc.LoadBalancer;
35
import io.grpc.LoadBalancerProvider;
36
import io.grpc.LongCounterMetricInstrument;
37
import io.grpc.MetricInstrumentRegistry;
38
import io.grpc.NameResolver;
39
import io.grpc.Status;
40
import io.grpc.SynchronizationContext;
41
import io.grpc.SynchronizationContext.ScheduledHandle;
42
import io.grpc.services.MetricReport;
43
import io.grpc.util.ForwardingLoadBalancerHelper;
44
import io.grpc.util.ForwardingSubchannel;
45
import io.grpc.util.RoundRobinLoadBalancer;
46
import io.grpc.xds.orca.OrcaOobUtil;
47
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
48
import io.grpc.xds.orca.OrcaPerRequestUtil;
49
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
50
import java.util.Collection;
51
import java.util.HashMap;
52
import java.util.HashSet;
53
import java.util.List;
54
import java.util.Map;
55
import java.util.Random;
56
import java.util.Set;
57
import java.util.concurrent.ScheduledExecutorService;
58
import java.util.concurrent.TimeUnit;
59
import java.util.concurrent.atomic.AtomicInteger;
60
import java.util.logging.Level;
61
import java.util.logging.Logger;
62

63
/**
64
 * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over the
65
 * {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
66
 * determined by backend metrics using ORCA.
67
 */
68
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9885")
69
final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
70

71
  private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
72
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
73
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;
74
  private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;
75
  private static final Logger log = Logger.getLogger(
1✔
76
      WeightedRoundRobinLoadBalancer.class.getName());
1✔
77
  private WeightedRoundRobinLoadBalancerConfig config;
78
  private final SynchronizationContext syncContext;
79
  private final ScheduledExecutorService timeService;
80
  private ScheduledHandle weightUpdateTimer;
81
  private final Runnable updateWeightTask;
82
  private final AtomicInteger sequence;
83
  private final long infTime;
84
  private final Ticker ticker;
85

86
  // The metric instruments are only registered once and shared by all instances of this LB.
87
  static {
88
    MetricInstrumentRegistry metricInstrumentRegistry
89
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
90
    RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
1✔
91
        "Number of scheduler updates in which there were not enough endpoints with valid "
92
            + "weight, which caused the WRR policy to fall back to RR behavior", "update",
93
        Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), true);
1✔
94
    ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
95
        "grpc.lb.wrr.endpoint_weight_not_yet_usable",
96
        "Number of endpoints from each scheduler update that don't yet have usable weight "
97
            + "information", "endpoint", Lists.newArrayList("grpc.target"),
1✔
98
        Lists.newArrayList("grpc.lb.locality"), true);
1✔
99
    ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
100
        "grpc.lb.wrr.endpoint_weight_stale",
101
        "Number of endpoints from each scheduler update whose latest weight is older than the "
102
            + "expiration period", "endpoint", Lists.newArrayList("grpc.target"),
1✔
103
        Lists.newArrayList("grpc.lb.locality"), true);
1✔
104
    ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
1✔
105
        "grpc.lb.wrr.endpoint_weights", "The histogram buckets will be endpoint weight ranges.",
106
        "weight", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
1✔
107
        Lists.newArrayList("grpc.lb.locality"),
1✔
108
        true);
109
  }
1✔
110

111
  public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
112
    this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random());
1✔
113
  }
1✔
114

115
  public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker, Random random) {
116
    super(helper);
1✔
117
    helper.setLoadBalancer(this);
1✔
118
    this.ticker = checkNotNull(ticker, "ticker");
1✔
119
    this.infTime = ticker.nanoTime() + Long.MAX_VALUE;
1✔
120
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
121
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
122
    this.updateWeightTask = new UpdateWeightTask();
1✔
123
    this.sequence = new AtomicInteger(random.nextInt());
1✔
124
    log.log(Level.FINE, "weighted_round_robin LB created");
1✔
125
  }
1✔
126

127
  @VisibleForTesting
128
  WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
129
    this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, random);
1✔
130
  }
1✔
131

132
  @Override
133
  protected ChildLbState createChildLbState(Object key, Object policyConfig,
134
      SubchannelPicker initialPicker, ResolvedAddresses unused) {
135
    ChildLbState childLbState = new WeightedChildLbState(key, pickFirstLbProvider, policyConfig,
1✔
136
        initialPicker);
137
    return childLbState;
1✔
138
  }
139

140
  @Override
141
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
142
    if (resolvedAddresses.getLoadBalancingPolicyConfig() == null) {
1✔
143
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
144
              "NameResolver returned no WeightedRoundRobinLoadBalancerConfig. addrs="
145
                      + resolvedAddresses.getAddresses()
1✔
146
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
147
      handleNameResolutionError(unavailableStatus);
1✔
148
      return unavailableStatus;
1✔
149
    }
150
    config =
1✔
151
            (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
152
    AcceptResolvedAddrRetVal acceptRetVal;
153
    try {
154
      resolvingAddresses = true;
1✔
155
      acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
1✔
156
      if (!acceptRetVal.status.isOk()) {
1✔
157
        return acceptRetVal.status;
×
158
      }
159

160
      if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
1✔
161
        weightUpdateTimer.cancel();
1✔
162
      }
163
      updateWeightTask.run();
1✔
164

165
      createAndApplyOrcaListeners();
1✔
166

167
      // Must update channel picker before return so that new RPCs will not be routed to deleted
168
      // clusters and resolver can remove them in service config.
169
      updateOverallBalancingState();
1✔
170

171
      shutdownRemoved(acceptRetVal.removedChildren);
1✔
172
    } finally {
173
      resolvingAddresses = false;
1✔
174
    }
175

176
    return acceptRetVal.status;
1✔
177
  }
178

179
  @Override
180
  public SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
181
    return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
1✔
182
        config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper());
1✔
183
  }
184

185
  @VisibleForTesting
186
  final class WeightedChildLbState extends ChildLbState {
187

188
    private final Set<WrrSubchannel> subchannels = new HashSet<>();
1✔
189
    private volatile long lastUpdated;
190
    private volatile long nonEmptySince;
191
    private volatile double weight = 0;
1✔
192

193
    private OrcaReportListener orcaReportListener;
194

195
    public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider, Object childConfig,
196
        SubchannelPicker initialPicker) {
1✔
197
      super(key, policyProvider, childConfig, initialPicker);
1✔
198
    }
1✔
199

200
    private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) {
201
      if (config == null) {
1✔
202
        return 0;
×
203
      }
204
      long now = ticker.nanoTime();
1✔
205
      if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
1✔
206
        nonEmptySince = infTime;
1✔
207
        staleEndpoints.incrementAndGet();
1✔
208
        return 0;
1✔
209
      } else if (now - nonEmptySince < config.blackoutPeriodNanos
1✔
210
          && config.blackoutPeriodNanos > 0) {
1✔
211
        notYetUsableEndpoints.incrementAndGet();
1✔
212
        return 0;
1✔
213
      } else {
214
        return weight;
1✔
215
      }
216
    }
217

218
    public void addSubchannel(WrrSubchannel wrrSubchannel) {
219
      subchannels.add(wrrSubchannel);
1✔
220
    }
1✔
221

222
    public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty) {
223
      if (orcaReportListener != null
1✔
224
          && orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty) {
1✔
225
        return orcaReportListener;
1✔
226
      }
227
      orcaReportListener = new OrcaReportListener(errorUtilizationPenalty);
1✔
228
      return orcaReportListener;
1✔
229
    }
230

231
    public void removeSubchannel(WrrSubchannel wrrSubchannel) {
232
      subchannels.remove(wrrSubchannel);
1✔
233
    }
1✔
234

235
    final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
236
      private final float errorUtilizationPenalty;
237

238
      OrcaReportListener(float errorUtilizationPenalty) {
1✔
239
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
240
      }
1✔
241

242
      @Override
243
      public void onLoadReport(MetricReport report) {
244
        double newWeight = 0;
1✔
245
        // Prefer application utilization and fallback to CPU utilization if unset.
246
        double utilization =
247
            report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
1✔
248
                : report.getCpuUtilization();
1✔
249
        if (utilization > 0 && report.getQps() > 0) {
1✔
250
          double penalty = 0;
1✔
251
          if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
1✔
252
            penalty = report.getEps() / report.getQps() * errorUtilizationPenalty;
1✔
253
          }
254
          newWeight = report.getQps() / (utilization + penalty);
1✔
255
        }
256
        if (newWeight == 0) {
1✔
257
          return;
1✔
258
        }
259
        if (nonEmptySince == infTime) {
1✔
260
          nonEmptySince = ticker.nanoTime();
1✔
261
        }
262
        lastUpdated = ticker.nanoTime();
1✔
263
        weight = newWeight;
1✔
264
      }
1✔
265
    }
266
  }
267

268
  private final class UpdateWeightTask implements Runnable {
1✔
269
    @Override
270
    public void run() {
271
      if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker) {
1✔
272
        ((WeightedRoundRobinPicker) currentPicker).updateWeight();
1✔
273
      }
274
      weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
1✔
275
          TimeUnit.NANOSECONDS, timeService);
1✔
276
    }
1✔
277
  }
278

279
  private void createAndApplyOrcaListeners() {
280
    for (ChildLbState child : getChildLbStates()) {
1✔
281
      WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
282
      for (WrrSubchannel weightedSubchannel : wChild.subchannels) {
1✔
283
        if (config.enableOobLoadReport) {
1✔
284
          OrcaOobUtil.setListener(weightedSubchannel,
1✔
285
              wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty),
1✔
286
              OrcaOobUtil.OrcaReportingConfig.newBuilder()
1✔
287
                  .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
1✔
288
                  .build());
1✔
289
        } else {
290
          OrcaOobUtil.setListener(weightedSubchannel, null, null);
1✔
291
        }
292
      }
1✔
293
    }
1✔
294
  }
1✔
295

296
  @Override
297
  public void shutdown() {
298
    if (weightUpdateTimer != null) {
1✔
299
      weightUpdateTimer.cancel();
1✔
300
    }
301
    super.shutdown();
1✔
302
  }
1✔
303

304
  private static final class WrrHelper extends ForwardingLoadBalancerHelper {
305
    private final Helper delegate;
306
    private WeightedRoundRobinLoadBalancer wrr;
307

308
    WrrHelper(Helper helper) {
1✔
309
      this.delegate = helper;
1✔
310
    }
1✔
311

312
    void setLoadBalancer(WeightedRoundRobinLoadBalancer lb) {
313
      this.wrr = lb;
1✔
314
    }
1✔
315

316
    @Override
317
    protected Helper delegate() {
318
      return delegate;
1✔
319
    }
320

321
    @Override
322
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
323
      checkElementIndex(0, args.getAddresses().size(), "Empty address group");
1✔
324
      WeightedChildLbState childLbState =
1✔
325
          (WeightedChildLbState) wrr.getChildLbStateEag(args.getAddresses().get(0));
1✔
326
      return wrr.new WrrSubchannel(delegate().createSubchannel(args), childLbState);
1✔
327
    }
328
  }
329

330
  @VisibleForTesting
331
  final class WrrSubchannel extends ForwardingSubchannel {
332
    private final Subchannel delegate;
333
    private final WeightedChildLbState owner;
334

335
    WrrSubchannel(Subchannel delegate, WeightedChildLbState owner) {
1✔
336
      this.delegate = checkNotNull(delegate, "delegate");
1✔
337
      this.owner = checkNotNull(owner, "owner");
1✔
338
    }
1✔
339

340
    @Override
341
    public void start(SubchannelStateListener listener) {
342
      owner.addSubchannel(this);
1✔
343
      delegate().start(new SubchannelStateListener() {
1✔
344
        @Override
345
        public void onSubchannelState(ConnectivityStateInfo newState) {
346
          if (newState.getState().equals(ConnectivityState.READY)) {
1✔
347
            owner.nonEmptySince = infTime;
1✔
348
          }
349
          listener.onSubchannelState(newState);
1✔
350
        }
1✔
351
      });
352
    }
1✔
353

354
    @Override
355
    protected Subchannel delegate() {
356
      return delegate;
1✔
357
    }
358

359
    @Override
360
    public void shutdown() {
361
      super.shutdown();
1✔
362
      owner.removeSubchannel(this);
1✔
363
    }
1✔
364
  }
365

366
  @VisibleForTesting
367
  static final class WeightedRoundRobinPicker extends SubchannelPicker {
368
    private final List<ChildLbState> children;
369
    private final Map<Subchannel, OrcaPerRequestReportListener> subchannelToReportListenerMap =
1✔
370
        new HashMap<>();
371
    private final boolean enableOobLoadReport;
372
    private final float errorUtilizationPenalty;
373
    private final AtomicInteger sequence;
374
    private final int hashCode;
375
    private final LoadBalancer.Helper helper;
376
    private volatile StaticStrideScheduler scheduler;
377

378
    WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
379
        float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper) {
1✔
380
      checkNotNull(children, "children");
1✔
381
      Preconditions.checkArgument(!children.isEmpty(), "empty child list");
1✔
382
      this.children = children;
1✔
383
      for (ChildLbState child : children) {
1✔
384
        WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
385
        for (WrrSubchannel subchannel : wChild.subchannels) {
1✔
386
          this.subchannelToReportListenerMap
1✔
387
              .put(subchannel, wChild.getOrCreateOrcaListener(errorUtilizationPenalty));
1✔
388
        }
1✔
389
      }
1✔
390
      this.enableOobLoadReport = enableOobLoadReport;
1✔
391
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
392
      this.sequence = checkNotNull(sequence, "sequence");
1✔
393
      this.helper = helper;
1✔
394

395
      // For equality we treat children as a set; use hash code as defined by Set
396
      int sum = 0;
1✔
397
      for (ChildLbState child : children) {
1✔
398
        sum += child.hashCode();
1✔
399
      }
1✔
400
      this.hashCode = sum
1✔
401
          ^ Boolean.hashCode(enableOobLoadReport)
1✔
402
          ^ Float.hashCode(errorUtilizationPenalty);
1✔
403

404
      updateWeight();
1✔
405
    }
1✔
406

407
    @Override
408
    public PickResult pickSubchannel(PickSubchannelArgs args) {
409
      ChildLbState childLbState = children.get(scheduler.pick());
1✔
410
      WeightedChildLbState wChild = (WeightedChildLbState) childLbState;
1✔
411
      PickResult pickResult = childLbState.getCurrentPicker().pickSubchannel(args);
1✔
412
      Subchannel subchannel = pickResult.getSubchannel();
1✔
413
      if (subchannel == null) {
1✔
414
        return pickResult;
1✔
415
      }
416
      if (!enableOobLoadReport) {
1✔
417
        return PickResult.withSubchannel(subchannel,
1✔
418
            OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
1✔
419
                subchannelToReportListenerMap.getOrDefault(subchannel,
1✔
420
                    wChild.getOrCreateOrcaListener(errorUtilizationPenalty))));
1✔
421
      } else {
422
        return PickResult.withSubchannel(subchannel);
1✔
423
      }
424
    }
425

426
    private void updateWeight() {
427
      float[] newWeights = new float[children.size()];
1✔
428
      AtomicInteger staleEndpoints = new AtomicInteger();
1✔
429
      AtomicInteger notYetUsableEndpoints = new AtomicInteger();
1✔
430
      for (int i = 0; i < children.size(); i++) {
1✔
431
        double newWeight = ((WeightedChildLbState) children.get(i)).getWeight(staleEndpoints,
1✔
432
            notYetUsableEndpoints);
433
        // TODO: add locality label once available
434
        helper.getMetricRecorder()
1✔
435
            .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
1✔
436
                ImmutableList.of(helper.getChannelTarget()),
1✔
437
                ImmutableList.of(""));
1✔
438
        newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
1✔
439
      }
440
      if (staleEndpoints.get() > 0) {
1✔
441
        // TODO: add locality label once available
442
        helper.getMetricRecorder()
1✔
443
            .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
1✔
444
                ImmutableList.of(helper.getChannelTarget()),
1✔
445
                ImmutableList.of(""));
1✔
446
      }
447
      if (notYetUsableEndpoints.get() > 0) {
1✔
448
        // TODO: add locality label once available
449
        helper.getMetricRecorder()
1✔
450
            .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
1✔
451
                ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(""));
1✔
452
      }
453

454
      this.scheduler = new StaticStrideScheduler(newWeights, sequence);
1✔
455
      if (this.scheduler.usesRoundRobin()) {
1✔
456
        // TODO: locality label once available
457
        helper.getMetricRecorder()
1✔
458
            .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
1✔
459
                ImmutableList.of(""));
1✔
460
      }
461
    }
1✔
462

463
    @Override
464
    public String toString() {
465
      return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
1✔
466
          .add("enableOobLoadReport", enableOobLoadReport)
1✔
467
          .add("errorUtilizationPenalty", errorUtilizationPenalty)
1✔
468
          .add("list", children).toString();
1✔
469
    }
470

471
    @VisibleForTesting
472
    List<ChildLbState> getChildren() {
473
      return children;
1✔
474
    }
475

476
    @Override
477
    public int hashCode() {
478
      return hashCode;
×
479
    }
480

481
    @Override
482
    public boolean equals(Object o) {
483
      if (!(o instanceof WeightedRoundRobinPicker)) {
1✔
484
        return false;
×
485
      }
486
      WeightedRoundRobinPicker other = (WeightedRoundRobinPicker) o;
1✔
487
      if (other == this) {
1✔
488
        return true;
×
489
      }
490
      // the lists cannot contain duplicate subchannels
491
      return hashCode == other.hashCode
1✔
492
          && sequence == other.sequence
493
          && enableOobLoadReport == other.enableOobLoadReport
494
          && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
1✔
495
          && children.size() == other.children.size()
1✔
496
          && new HashSet<>(children).containsAll(other.children);
1✔
497
    }
498
  }
499

500
  /*
501
   * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
502
   * in which each object's deadline is the multiplicative inverse of the object's weight.
503
   * <p>
504
   * The way in which this is implemented is through a static stride scheduler. 
505
   * The Static Stride Scheduler works by iterating through the list of subchannel weights
506
   * and using modular arithmetic to proportionally distribute picks, favoring entries 
507
   * with higher weights. It is based on the observation that the intended sequence generated 
508
   * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic. 
509
   * The Static Stride Scheduler is more performant than other implementations of the EDF
510
   * Scheduler, as it removes the need for a priority queue (and thus mutex locks).
511
   * <p>
512
   * go/static-stride-scheduler
513
   * <p>
514
   *
515
   * <ul>
516
   *  <li>nextSequence() - O(1)
517
   *  <li>pick() - O(n)
518
   */
519
  @VisibleForTesting
520
  static final class StaticStrideScheduler {
521
    private final short[] scaledWeights;
522
    private final AtomicInteger sequence;
523
    private final boolean usesRoundRobin;
524
    private static final int K_MAX_WEIGHT = 0xFFFF;
525

526
    // Assuming the mean of all known weights is M, StaticStrideScheduler will clamp
527
    // weights bigger than M*kMaxRatio and weights smaller than M*kMinRatio.
528
    //
529
    // This is done as a performance optimization by limiting the number of rounds for picks
530
    // for edge cases where channels have large differences in subchannel weights.
531
    // In this case, without these clips, it would potentially require the scheduler to
532
    // frequently traverse through the entire subchannel list within the pick method.
533
    //
534
    // The current values of 10 and 0.1 were chosen without any experimenting. It should
535
    // decrease the amount of sequences that the scheduler must traverse through in order
536
    // to pick a high weight subchannel in such corner cases.
537
    // But, it also makes WeightedRoundRobin to send slightly more requests to
538
    // potentially very bad tasks (that would have near-zero weights) than zero.
539
    // This is not necessarily a downside, though. Perhaps this is not a problem at
540
    // all, and we can increase this value if needed to save CPU cycles.
541
    private static final double K_MAX_RATIO = 10;
542
    private static final double K_MIN_RATIO = 0.1;
543

544
    StaticStrideScheduler(float[] weights, AtomicInteger sequence) {
1✔
545
      checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
1✔
546
      int numChannels = weights.length;
1✔
547
      int numWeightedChannels = 0;
1✔
548
      double sumWeight = 0;
1✔
549
      double unscaledMeanWeight;
550
      float unscaledMaxWeight = 0;
1✔
551
      for (float weight : weights) {
1✔
552
        if (weight > 0) {
1✔
553
          sumWeight += weight;
1✔
554
          unscaledMaxWeight = Math.max(weight, unscaledMaxWeight);
1✔
555
          numWeightedChannels++;
1✔
556
        }
557
      }
558

559
      // Adjust max value s.t. ratio does not exceed K_MAX_RATIO. This should
560
      // ensure that we on average do at most K_MAX_RATIO rounds for picks.
561
      if (numWeightedChannels > 0) {
1✔
562
        unscaledMeanWeight = sumWeight / numWeightedChannels;
1✔
563
        unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));
1✔
564
        usesRoundRobin = false;
1✔
565
      } else {
566
        // Fall back to round robin if all values are non-positives
567
        usesRoundRobin = true;
1✔
568
        unscaledMeanWeight = 1;
1✔
569
        unscaledMaxWeight = 1;
1✔
570
      }
571

572
      // Scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly.
573
      // Note that, since we cap the weights to stay within K_MAX_RATIO, meanWeight might not
574
      // match the actual mean of the values that end up in the scheduler.
575
      double scalingFactor = K_MAX_WEIGHT / unscaledMaxWeight;
1✔
576
      // We compute weightLowerBound and clamp it to 1 from below so that in the
577
      // worst case, we represent tiny weights as 1.
578
      int weightLowerBound = (int) Math.ceil(scalingFactor * unscaledMeanWeight * K_MIN_RATIO);
1✔
579
      short[] scaledWeights = new short[numChannels];
1✔
580
      for (int i = 0; i < numChannels; i++) {
1✔
581
        if (weights[i] <= 0) {
1✔
582
          scaledWeights[i] = (short) Math.round(scalingFactor * unscaledMeanWeight);
1✔
583
        } else {
584
          int weight = (int) Math.round(scalingFactor * Math.min(weights[i], unscaledMaxWeight));
1✔
585
          scaledWeights[i] = (short) Math.max(weight, weightLowerBound);
1✔
586
        }
587
      }
588

589
      this.scaledWeights = scaledWeights;
1✔
590
      this.sequence = sequence;
1✔
591
    }
1✔
592

593
    // Without properly weighted channels, we do plain vanilla round_robin.
594
    boolean usesRoundRobin() {
595
      return usesRoundRobin;
1✔
596
    }
597

598
    /**
599
     * Returns the next sequence number and atomically increases sequence with wraparound.
600
     */
601
    private long nextSequence() {
602
      return Integer.toUnsignedLong(sequence.getAndIncrement());
1✔
603
    }
604

605
    /*
606
     * Selects index of next backend server.
607
     * <p>
608
     * A 2D array is compactly represented as a function of W(backend), where the row
609
     * represents the generation and the column represents the backend index:
610
     * X(backend,generation) | generation ∈ [0,kMaxWeight).
611
     * Each element in the conceptual array is a boolean indicating whether the backend at
612
     * this index should be picked now. If false, the counter is incremented again,
613
     * and the new element is checked. An atomically incremented counter keeps track of our
614
     * backend and generation through modular arithmetic within the pick() method.
615
     * <p>
616
     * Modular arithmetic allows us to evenly distribute picks and skips between
617
     * generations based on W(backend).
618
     * X(backend,generation) = (W(backend) * generation) % kMaxWeight >= kMaxWeight - W(backend)
619
     * If we have the same three backends with weights:
620
     * W(backend) = {2,3,6} scaled to max(W(backend)) = 6, then X(backend,generation) is:
621
     * <p>
622
     * B0    B1    B2
623
     * T     T     T
624
     * F     F     T
625
     * F     T     T
626
     * T     F     T
627
     * F     T     T
628
     * F     F     T
629
     * The sequence of picked backend indices is given by
630
     * walking across and down: {0,1,2,2,1,2,0,2,1,2,2}.
631
     * <p>
632
     * To reduce the variance and spread the wasted work among different picks,
633
     * an offset that varies per backend index is also included to the calculation.
634
     */
635
    int pick() {
636
      while (true) {
637
        long sequence = this.nextSequence();
1✔
638
        int backendIndex = (int) (sequence % scaledWeights.length);
1✔
639
        long generation = sequence / scaledWeights.length;
1✔
640
        int weight = Short.toUnsignedInt(scaledWeights[backendIndex]);
1✔
641
        long offset = (long) K_MAX_WEIGHT / 2 * backendIndex;
1✔
642
        if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
1✔
643
          continue;
1✔
644
        }
645
        return backendIndex;
1✔
646
      }
647
    }
648
  }
649

650
  static final class WeightedRoundRobinLoadBalancerConfig {
651
    final long blackoutPeriodNanos;
652
    final long weightExpirationPeriodNanos;
653
    final boolean enableOobLoadReport;
654
    final long oobReportingPeriodNanos;
655
    final long weightUpdatePeriodNanos;
656
    final float errorUtilizationPenalty;
657

658
    public static Builder newBuilder() {
659
      return new Builder();
1✔
660
    }
661

662
    private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
663
                                                 long weightExpirationPeriodNanos,
664
                                                 boolean enableOobLoadReport,
665
                                                 long oobReportingPeriodNanos,
666
                                                 long weightUpdatePeriodNanos,
667
                                                 float errorUtilizationPenalty) {
1✔
668
      this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
669
      this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
670
      this.enableOobLoadReport = enableOobLoadReport;
1✔
671
      this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
672
      this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
673
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
674
    }
1✔
675

676
    static final class Builder {
677
      long blackoutPeriodNanos = 10_000_000_000L; // 10s
1✔
678
      long weightExpirationPeriodNanos = 180_000_000_000L; //3min
1✔
679
      boolean enableOobLoadReport = false;
1✔
680
      long oobReportingPeriodNanos = 10_000_000_000L; // 10s
1✔
681
      long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
1✔
682
      float errorUtilizationPenalty = 1.0F;
1✔
683

684
      private Builder() {
1✔
685

686
      }
1✔
687

688
      @SuppressWarnings("UnusedReturnValue")
689
      Builder setBlackoutPeriodNanos(long blackoutPeriodNanos) {
690
        this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
691
        return this;
1✔
692
      }
693

694
      @SuppressWarnings("UnusedReturnValue")
695
      Builder setWeightExpirationPeriodNanos(long weightExpirationPeriodNanos) {
696
        this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
697
        return this;
1✔
698
      }
699

700
      Builder setEnableOobLoadReport(boolean enableOobLoadReport) {
701
        this.enableOobLoadReport = enableOobLoadReport;
1✔
702
        return this;
1✔
703
      }
704

705
      Builder setOobReportingPeriodNanos(long oobReportingPeriodNanos) {
706
        this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
707
        return this;
1✔
708
      }
709

710
      Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) {
711
        this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
712
        return this;
1✔
713
      }
714

715
      Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
716
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
717
        return this;
1✔
718
      }
719

720
      WeightedRoundRobinLoadBalancerConfig build() {
721
        return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
1✔
722
                weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
723
                weightUpdatePeriodNanos, errorUtilizationPenalty);
724
      }
725
    }
726
  }
727
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc