• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19954

22 Aug 2025 03:07PM UTC coverage: 88.566% (+0.01%) from 88.553%
#19954

push

github

web-flow
xds: Implement equals in WRRLBConfig

Just an is a8de9f0, lack of equals causes cluster_resolver to consider every update a different configuration and restart itself.

Handling NaN should really be prevented with validation, but it looks like that
would lead to yak shaving at the moment.

b/435208946

34710 of 39191 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.48
/../xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Preconditions;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.Lists;
27
import io.grpc.ConnectivityState;
28
import io.grpc.ConnectivityStateInfo;
29
import io.grpc.Deadline.Ticker;
30
import io.grpc.DoubleHistogramMetricInstrument;
31
import io.grpc.EquivalentAddressGroup;
32
import io.grpc.LoadBalancer;
33
import io.grpc.LoadBalancerProvider;
34
import io.grpc.LongCounterMetricInstrument;
35
import io.grpc.MetricInstrumentRegistry;
36
import io.grpc.NameResolver;
37
import io.grpc.Status;
38
import io.grpc.SynchronizationContext;
39
import io.grpc.SynchronizationContext.ScheduledHandle;
40
import io.grpc.services.MetricReport;
41
import io.grpc.util.ForwardingSubchannel;
42
import io.grpc.util.MultiChildLoadBalancer;
43
import io.grpc.xds.orca.OrcaOobUtil;
44
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
45
import io.grpc.xds.orca.OrcaPerRequestUtil;
46
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
47
import java.util.ArrayList;
48
import java.util.Collection;
49
import java.util.HashSet;
50
import java.util.List;
51
import java.util.Objects;
52
import java.util.Random;
53
import java.util.Set;
54
import java.util.concurrent.ScheduledExecutorService;
55
import java.util.concurrent.TimeUnit;
56
import java.util.concurrent.atomic.AtomicInteger;
57
import java.util.logging.Level;
58
import java.util.logging.Logger;
59

60
/**
61
 * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over the
62
 * {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
63
 * determined by backend metrics using ORCA.
64
 * To use WRR, users may configure through channel serviceConfig. Example config:
65
 * <pre> {@code
66
 *       String wrrConfig = "{\"loadBalancingConfig\":" +
67
 *           "[{\"weighted_round_robin\":{\"enableOobLoadReport\":true, " +
68
 *           "\"blackoutPeriod\":\"10s\"," +
69
 *           "\"oobReportingPeriod\":\"10s\"," +
70
 *           "\"weightExpirationPeriod\":\"180s\"," +
71
 *           "\"errorUtilizationPenalty\":\"1.0\"," +
72
 *           "\"weightUpdatePeriod\":\"1s\"}}]}";
73
 *        serviceConfig = (Map<String, ?>) JsonParser.parse(wrrConfig);
74
 *        channel = ManagedChannelBuilder.forTarget("test:///lb.test.grpc.io")
75
 *            .defaultServiceConfig(serviceConfig)
76
 *            .build();
77
 *  }
78
 *  </pre>
79
 *  Users may also configure through xDS control plane via custom lb policy. But that is much more
80
 *  complex to set up. Example config:
81
 *  <pre>
82
 *  localityLbPolicies:
83
 *   - customPolicy:
84
 *       name: weighted_round_robin
85
 *       data: '{ "enableOobLoadReport": true }'
86
 *  </pre>
87
 *  See related documentation: https://cloud.google.com/service-mesh/legacy/load-balancing-apis/proxyless-configure-advanced-traffic-management#custom-lb-config
88
 */
89
final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
90

91
  private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
92
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
93
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;
94
  private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;
95
  private static final Logger log = Logger.getLogger(
1✔
96
      WeightedRoundRobinLoadBalancer.class.getName());
1✔
97
  private WeightedRoundRobinLoadBalancerConfig config;
98
  private final SynchronizationContext syncContext;
99
  private final ScheduledExecutorService timeService;
100
  private ScheduledHandle weightUpdateTimer;
101
  private final Runnable updateWeightTask;
102
  private final AtomicInteger sequence;
103
  private final long infTime;
104
  private final Ticker ticker;
105
  private String locality = "";
1✔
106
  private String backendService = "";
1✔
107
  private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
108

109
  // The metric instruments are only registered once and shared by all instances of this LB.
110
  static {
111
    MetricInstrumentRegistry metricInstrumentRegistry
112
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
113
    RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
114
        "grpc.lb.wrr.rr_fallback",
115
        "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints "
116
            + "with valid weight, which caused the WRR policy to fall back to RR behavior",
117
        "{update}",
118
        Lists.newArrayList("grpc.target"),
1✔
119
        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
1✔
120
        false);
121
    ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
122
        "grpc.lb.wrr.endpoint_weight_not_yet_usable",
123
        "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable "
124
            + "weight information",
125
        "{endpoint}",
126
        Lists.newArrayList("grpc.target"),
1✔
127
        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
1✔
128
        false);
129
    ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
130
        "grpc.lb.wrr.endpoint_weight_stale",
131
        "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is "
132
            + "older than the expiration period",
133
        "{endpoint}",
134
        Lists.newArrayList("grpc.target"),
1✔
135
        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
1✔
136
        false);
137
    ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
1✔
138
        "grpc.lb.wrr.endpoint_weights",
139
        "EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
140
        "{weight}",
141
        Lists.newArrayList(),
1✔
142
        Lists.newArrayList("grpc.target"),
1✔
143
        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
1✔
144
        false);
145
  }
1✔
146

147
  public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
148
    this(helper, ticker, new Random());
1✔
149
  }
1✔
150

151
  @VisibleForTesting
152
  WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
153
    super(OrcaOobUtil.newOrcaReportingHelper(helper));
1✔
154
    this.ticker = checkNotNull(ticker, "ticker");
1✔
155
    this.infTime = ticker.nanoTime() + Long.MAX_VALUE;
1✔
156
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
157
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
158
    this.updateWeightTask = new UpdateWeightTask();
1✔
159
    this.sequence = new AtomicInteger(random.nextInt());
1✔
160
    log.log(Level.FINE, "weighted_round_robin LB created");
1✔
161
  }
1✔
162

163
  @Override
164
  protected ChildLbState createChildLbState(Object key) {
165
    return new WeightedChildLbState(key, pickFirstLbProvider);
1✔
166
  }
167

168
  @Override
169
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
170
    if (resolvedAddresses.getLoadBalancingPolicyConfig() == null) {
1✔
171
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
172
              "NameResolver returned no WeightedRoundRobinLoadBalancerConfig. addrs="
173
                      + resolvedAddresses.getAddresses()
1✔
174
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
175
      handleNameResolutionError(unavailableStatus);
1✔
176
      return unavailableStatus;
1✔
177
    }
178
    String locality = resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME);
1✔
179
    if (locality != null) {
1✔
180
      this.locality = locality;
1✔
181
    } else {
182
      this.locality = "";
1✔
183
    }
184
    String backendService
1✔
185
        = resolvedAddresses.getAttributes().get(NameResolver.ATTR_BACKEND_SERVICE);
1✔
186
    if (backendService != null) {
1✔
187
      this.backendService = backendService;
1✔
188
    } else {
189
      this.backendService = "";
1✔
190
    }
191
    config =
1✔
192
            (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
193

194
    if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
1✔
195
      weightUpdateTimer.cancel();
1✔
196
    }
197
    updateWeightTask.run();
1✔
198

199
    Status status = super.acceptResolvedAddresses(resolvedAddresses);
1✔
200

201
    createAndApplyOrcaListeners();
1✔
202

203
    return status;
1✔
204
  }
205

206
  /**
207
   * Updates picker with the list of active subchannels (state == READY).
208
   */
209
  @Override
210
  protected void updateOverallBalancingState() {
211
    List<ChildLbState> activeList = getReadyChildren();
1✔
212
    if (activeList.isEmpty()) {
1✔
213
      // No READY subchannels
214

215
      // MultiChildLB will request connection immediately on subchannel IDLE.
216
      boolean isConnecting = false;
1✔
217
      for (ChildLbState childLbState : getChildLbStates()) {
1✔
218
        ConnectivityState state = childLbState.getCurrentState();
1✔
219
        if (state == ConnectivityState.CONNECTING || state == ConnectivityState.IDLE) {
1✔
220
          isConnecting = true;
1✔
221
          break;
1✔
222
        }
223
      }
1✔
224

225
      if (isConnecting) {
1✔
226
        updateBalancingState(
1✔
227
            ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
1✔
228
      } else {
229
        updateBalancingState(
1✔
230
            ConnectivityState.TRANSIENT_FAILURE, createReadyPicker(getChildLbStates()));
1✔
231
      }
232
    } else {
1✔
233
      updateBalancingState(ConnectivityState.READY, createReadyPicker(activeList));
1✔
234
    }
235
  }
1✔
236

237
  private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
238
    WeightedRoundRobinPicker picker = new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
1✔
239
        config.enableOobLoadReport, config.errorUtilizationPenalty, sequence);
240
    updateWeight(picker);
1✔
241
    return picker;
1✔
242
  }
243

244
  private void updateWeight(WeightedRoundRobinPicker picker) {
245
    Helper helper = getHelper();
1✔
246
    float[] newWeights = new float[picker.children.size()];
1✔
247
    AtomicInteger staleEndpoints = new AtomicInteger();
1✔
248
    AtomicInteger notYetUsableEndpoints = new AtomicInteger();
1✔
249
    for (int i = 0; i < picker.children.size(); i++) {
1✔
250
      double newWeight = ((WeightedChildLbState) picker.children.get(i)).getWeight(staleEndpoints,
1✔
251
          notYetUsableEndpoints);
252
      helper.getMetricRecorder()
1✔
253
          .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
1✔
254
              ImmutableList.of(helper.getChannelTarget()),
1✔
255
              ImmutableList.of(locality, backendService));
1✔
256
      newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
1✔
257
    }
258

259
    if (staleEndpoints.get() > 0) {
1✔
260
      helper.getMetricRecorder()
1✔
261
          .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
1✔
262
              ImmutableList.of(helper.getChannelTarget()),
1✔
263
              ImmutableList.of(locality, backendService));
1✔
264
    }
265
    if (notYetUsableEndpoints.get() > 0) {
1✔
266
      helper.getMetricRecorder()
1✔
267
          .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
1✔
268
              ImmutableList.of(helper.getChannelTarget()),
1✔
269
              ImmutableList.of(locality, backendService));
1✔
270
    }
271
    boolean weightsEffective = picker.updateWeight(newWeights);
1✔
272
    if (!weightsEffective) {
1✔
273
      helper.getMetricRecorder()
1✔
274
          .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
1✔
275
              ImmutableList.of(locality, backendService));
1✔
276
    }
277
  }
1✔
278

279
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
280
    if (state != currentConnectivityState || !picker.equals(currentPicker)) {
1✔
281
      getHelper().updateBalancingState(state, picker);
1✔
282
      currentConnectivityState = state;
1✔
283
      currentPicker = picker;
1✔
284
    }
285
  }
1✔
286

287
  @VisibleForTesting
288
  final class WeightedChildLbState extends ChildLbState {
289

290
    private final Set<WrrSubchannel> subchannels = new HashSet<>();
1✔
291
    private volatile long lastUpdated;
292
    private volatile long nonEmptySince;
293
    private volatile double weight = 0;
1✔
294

295
    private OrcaReportListener orcaReportListener;
296

297
    public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider) {
1✔
298
      super(key, policyProvider);
1✔
299
    }
1✔
300

301
    @Override
302
    protected ChildLbStateHelper createChildHelper() {
303
      return new WrrChildLbStateHelper();
1✔
304
    }
305

306
    private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) {
307
      if (config == null) {
1✔
308
        return 0;
×
309
      }
310
      long now = ticker.nanoTime();
1✔
311
      if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
1✔
312
        nonEmptySince = infTime;
1✔
313
        staleEndpoints.incrementAndGet();
1✔
314
        return 0;
1✔
315
      } else if (now - nonEmptySince < config.blackoutPeriodNanos
1✔
316
          && config.blackoutPeriodNanos > 0) {
1✔
317
        notYetUsableEndpoints.incrementAndGet();
1✔
318
        return 0;
1✔
319
      } else {
320
        return weight;
1✔
321
      }
322
    }
323

324
    public void addSubchannel(WrrSubchannel wrrSubchannel) {
325
      subchannels.add(wrrSubchannel);
1✔
326
    }
1✔
327

328
    public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty) {
329
      if (orcaReportListener != null
1✔
330
          && orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty) {
1✔
331
        return orcaReportListener;
1✔
332
      }
333
      orcaReportListener = new OrcaReportListener(errorUtilizationPenalty);
1✔
334
      return orcaReportListener;
1✔
335
    }
336

337
    public void removeSubchannel(WrrSubchannel wrrSubchannel) {
338
      subchannels.remove(wrrSubchannel);
1✔
339
    }
1✔
340

341
    final class WrrChildLbStateHelper extends ChildLbStateHelper {
1✔
342
      @Override
343
      public Subchannel createSubchannel(CreateSubchannelArgs args) {
344
        return new WrrSubchannel(super.createSubchannel(args), WeightedChildLbState.this);
1✔
345
      }
346

347
      @Override
348
      public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
349
        super.updateBalancingState(newState, newPicker);
1✔
350
        if (!resolvingAddresses && newState == ConnectivityState.IDLE) {
1✔
351
          getLb().requestConnection();
×
352
        }
353
      }
1✔
354
    }
355

356
    final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
357
      private final float errorUtilizationPenalty;
358

359
      OrcaReportListener(float errorUtilizationPenalty) {
1✔
360
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
361
      }
1✔
362

363
      @Override
364
      public void onLoadReport(MetricReport report) {
365
        double newWeight = 0;
1✔
366
        // Prefer application utilization and fallback to CPU utilization if unset.
367
        double utilization =
368
            report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
1✔
369
                : report.getCpuUtilization();
1✔
370
        if (utilization > 0 && report.getQps() > 0) {
1✔
371
          double penalty = 0;
1✔
372
          if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
1✔
373
            penalty = report.getEps() / report.getQps() * errorUtilizationPenalty;
1✔
374
          }
375
          newWeight = report.getQps() / (utilization + penalty);
1✔
376
        }
377
        if (newWeight == 0) {
1✔
378
          return;
1✔
379
        }
380
        if (nonEmptySince == infTime) {
1✔
381
          nonEmptySince = ticker.nanoTime();
1✔
382
        }
383
        lastUpdated = ticker.nanoTime();
1✔
384
        weight = newWeight;
1✔
385
      }
1✔
386
    }
387
  }
388

389
  private final class UpdateWeightTask implements Runnable {
1✔
390
    @Override
391
    public void run() {
392
      if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker) {
1✔
393
        updateWeight((WeightedRoundRobinPicker) currentPicker);
1✔
394
      }
395
      weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
1✔
396
          TimeUnit.NANOSECONDS, timeService);
1✔
397
    }
1✔
398
  }
399

400
  private void createAndApplyOrcaListeners() {
401
    for (ChildLbState child : getChildLbStates()) {
1✔
402
      WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
403
      for (WrrSubchannel weightedSubchannel : wChild.subchannels) {
1✔
404
        if (config.enableOobLoadReport) {
1✔
405
          OrcaOobUtil.setListener(weightedSubchannel,
1✔
406
              wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty),
1✔
407
              OrcaOobUtil.OrcaReportingConfig.newBuilder()
1✔
408
                  .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
1✔
409
                  .build());
1✔
410
        } else {
411
          OrcaOobUtil.setListener(weightedSubchannel, null, null);
1✔
412
        }
413
      }
1✔
414
    }
1✔
415
  }
1✔
416

417
  @Override
418
  public void shutdown() {
419
    if (weightUpdateTimer != null) {
1✔
420
      weightUpdateTimer.cancel();
1✔
421
    }
422
    super.shutdown();
1✔
423
  }
1✔
424

425
  @VisibleForTesting
426
  final class WrrSubchannel extends ForwardingSubchannel {
427
    private final Subchannel delegate;
428
    private final WeightedChildLbState owner;
429

430
    WrrSubchannel(Subchannel delegate, WeightedChildLbState owner) {
1✔
431
      this.delegate = checkNotNull(delegate, "delegate");
1✔
432
      this.owner = checkNotNull(owner, "owner");
1✔
433
    }
1✔
434

435
    @Override
436
    public void start(SubchannelStateListener listener) {
437
      owner.addSubchannel(this);
1✔
438
      delegate().start(new SubchannelStateListener() {
1✔
439
        @Override
440
        public void onSubchannelState(ConnectivityStateInfo newState) {
441
          if (newState.getState().equals(ConnectivityState.READY)) {
1✔
442
            owner.nonEmptySince = infTime;
1✔
443
          }
444
          listener.onSubchannelState(newState);
1✔
445
        }
1✔
446
      });
447
    }
1✔
448

449
    @Override
450
    protected Subchannel delegate() {
451
      return delegate;
1✔
452
    }
453

454
    @Override
455
    public void shutdown() {
456
      super.shutdown();
1✔
457
      owner.removeSubchannel(this);
1✔
458
    }
1✔
459
  }
460

461
  @VisibleForTesting
462
  static final class WeightedRoundRobinPicker extends SubchannelPicker {
463
    // Parallel lists (column-based storage instead of normal row-based storage of List<Struct>).
464
    // The ith element of children corresponds to the ith element of pickers, listeners, and even
465
    // updateWeight(float[]).
466
    private final List<ChildLbState> children; // May only be accessed from sync context
467
    private final List<SubchannelPicker> pickers;
468
    private final List<OrcaPerRequestReportListener> reportListeners;
469
    private final boolean enableOobLoadReport;
470
    private final float errorUtilizationPenalty;
471
    private final AtomicInteger sequence;
472
    private final int hashCode;
473
    private volatile StaticStrideScheduler scheduler;
474

475
    WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
476
        float errorUtilizationPenalty, AtomicInteger sequence) {
1✔
477
      checkNotNull(children, "children");
1✔
478
      Preconditions.checkArgument(!children.isEmpty(), "empty child list");
1✔
479
      this.children = children;
1✔
480
      List<SubchannelPicker> pickers = new ArrayList<>(children.size());
1✔
481
      List<OrcaPerRequestReportListener> reportListeners = new ArrayList<>(children.size());
1✔
482
      for (ChildLbState child : children) {
1✔
483
        WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
484
        pickers.add(wChild.getCurrentPicker());
1✔
485
        reportListeners.add(wChild.getOrCreateOrcaListener(errorUtilizationPenalty));
1✔
486
      }
1✔
487
      this.pickers = pickers;
1✔
488
      this.reportListeners = reportListeners;
1✔
489
      this.enableOobLoadReport = enableOobLoadReport;
1✔
490
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
491
      this.sequence = checkNotNull(sequence, "sequence");
1✔
492

493
      // For equality we treat pickers as a set; use hash code as defined by Set
494
      int sum = 0;
1✔
495
      for (SubchannelPicker picker : pickers) {
1✔
496
        sum += picker.hashCode();
1✔
497
      }
1✔
498
      this.hashCode = sum
1✔
499
          ^ Boolean.hashCode(enableOobLoadReport)
1✔
500
          ^ Float.hashCode(errorUtilizationPenalty);
1✔
501
    }
1✔
502

503
    @Override
504
    public PickResult pickSubchannel(PickSubchannelArgs args) {
505
      int pick = scheduler.pick();
1✔
506
      PickResult pickResult = pickers.get(pick).pickSubchannel(args);
1✔
507
      Subchannel subchannel = pickResult.getSubchannel();
1✔
508
      if (subchannel == null) {
1✔
509
        return pickResult;
1✔
510
      }
511
      if (!enableOobLoadReport) {
1✔
512
        return PickResult.withSubchannel(subchannel,
1✔
513
            OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
1✔
514
                reportListeners.get(pick)));
1✔
515
      } else {
516
        return PickResult.withSubchannel(subchannel);
1✔
517
      }
518
    }
519

520
    /** Returns {@code true} if weights are different than round_robin. */
521
    private boolean updateWeight(float[] newWeights) {
522
      this.scheduler = new StaticStrideScheduler(newWeights, sequence);
1✔
523
      return !this.scheduler.usesRoundRobin();
1✔
524
    }
525

526
    @Override
527
    public String toString() {
528
      return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
1✔
529
          .add("enableOobLoadReport", enableOobLoadReport)
1✔
530
          .add("errorUtilizationPenalty", errorUtilizationPenalty)
1✔
531
          .add("pickers", pickers)
1✔
532
          .toString();
1✔
533
    }
534

535
    @VisibleForTesting
536
    List<ChildLbState> getChildren() {
537
      return children;
1✔
538
    }
539

540
    @Override
541
    public int hashCode() {
542
      return hashCode;
×
543
    }
544

545
    @Override
546
    public boolean equals(Object o) {
547
      if (!(o instanceof WeightedRoundRobinPicker)) {
1✔
548
        return false;
×
549
      }
550
      WeightedRoundRobinPicker other = (WeightedRoundRobinPicker) o;
1✔
551
      if (other == this) {
1✔
552
        return true;
×
553
      }
554
      // the lists cannot contain duplicate subchannels
555
      return hashCode == other.hashCode
1✔
556
          && sequence == other.sequence
557
          && enableOobLoadReport == other.enableOobLoadReport
558
          && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
1✔
559
          && pickers.size() == other.pickers.size()
1✔
560
          && new HashSet<>(pickers).containsAll(other.pickers);
1✔
561
    }
562
  }
563

564
  /*
565
   * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
566
   * in which each object's deadline is the multiplicative inverse of the object's weight.
567
   * <p>
568
   * The way in which this is implemented is through a static stride scheduler. 
569
   * The Static Stride Scheduler works by iterating through the list of subchannel weights
570
   * and using modular arithmetic to proportionally distribute picks, favoring entries 
571
   * with higher weights. It is based on the observation that the intended sequence generated 
572
   * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic. 
573
   * The Static Stride Scheduler is more performant than other implementations of the EDF
574
   * Scheduler, as it removes the need for a priority queue (and thus mutex locks).
575
   * <p>
576
   * go/static-stride-scheduler
577
   * <p>
578
   *
579
   * <ul>
580
   *  <li>nextSequence() - O(1)
581
   *  <li>pick() - O(n)
582
   */
583
  @VisibleForTesting
584
  static final class StaticStrideScheduler {
585
    private final short[] scaledWeights;
586
    private final AtomicInteger sequence;
587
    private final boolean usesRoundRobin;
588
    private static final int K_MAX_WEIGHT = 0xFFFF;
589

590
    // Assuming the mean of all known weights is M, StaticStrideScheduler will clamp
591
    // weights bigger than M*kMaxRatio and weights smaller than M*kMinRatio.
592
    //
593
    // This is done as a performance optimization by limiting the number of rounds for picks
594
    // for edge cases where channels have large differences in subchannel weights.
595
    // In this case, without these clips, it would potentially require the scheduler to
596
    // frequently traverse through the entire subchannel list within the pick method.
597
    //
598
    // The current values of 10 and 0.1 were chosen without any experimenting. It should
599
    // decrease the amount of sequences that the scheduler must traverse through in order
600
    // to pick a high weight subchannel in such corner cases.
601
    // But, it also makes WeightedRoundRobin to send slightly more requests to
602
    // potentially very bad tasks (that would have near-zero weights) than zero.
603
    // This is not necessarily a downside, though. Perhaps this is not a problem at
604
    // all, and we can increase this value if needed to save CPU cycles.
605
    private static final double K_MAX_RATIO = 10;
606
    private static final double K_MIN_RATIO = 0.1;
607

608
    StaticStrideScheduler(float[] weights, AtomicInteger sequence) {
1✔
609
      checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
1✔
610
      int numChannels = weights.length;
1✔
611
      int numWeightedChannels = 0;
1✔
612
      double sumWeight = 0;
1✔
613
      double unscaledMeanWeight;
614
      float unscaledMaxWeight = 0;
1✔
615
      for (float weight : weights) {
1✔
616
        if (weight > 0) {
1✔
617
          sumWeight += weight;
1✔
618
          unscaledMaxWeight = Math.max(weight, unscaledMaxWeight);
1✔
619
          numWeightedChannels++;
1✔
620
        }
621
      }
622

623
      // Adjust max value s.t. ratio does not exceed K_MAX_RATIO. This should
624
      // ensure that we on average do at most K_MAX_RATIO rounds for picks.
625
      if (numWeightedChannels > 0) {
1✔
626
        unscaledMeanWeight = sumWeight / numWeightedChannels;
1✔
627
        unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));
1✔
628
      } else {
629
        // Fall back to round robin if all values are non-positives. Note that
630
        // numWeightedChannels == 1 also behaves like RR because the weights are all the same, but
631
        // the weights aren't 1, so it doesn't go through this path.
632
        unscaledMeanWeight = 1;
1✔
633
        unscaledMaxWeight = 1;
1✔
634
      }
635
      // We need at least two weights for WRR to be distinguishable from round_robin.
636
      usesRoundRobin = numWeightedChannels < 2;
1✔
637

638
      // Scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly.
639
      // Note that, since we cap the weights to stay within K_MAX_RATIO, meanWeight might not
640
      // match the actual mean of the values that end up in the scheduler.
641
      double scalingFactor = K_MAX_WEIGHT / unscaledMaxWeight;
1✔
642
      // We compute weightLowerBound and clamp it to 1 from below so that in the
643
      // worst case, we represent tiny weights as 1.
644
      int weightLowerBound = (int) Math.ceil(scalingFactor * unscaledMeanWeight * K_MIN_RATIO);
1✔
645
      short[] scaledWeights = new short[numChannels];
1✔
646
      for (int i = 0; i < numChannels; i++) {
1✔
647
        if (weights[i] <= 0) {
1✔
648
          scaledWeights[i] = (short) Math.round(scalingFactor * unscaledMeanWeight);
1✔
649
        } else {
650
          int weight = (int) Math.round(scalingFactor * Math.min(weights[i], unscaledMaxWeight));
1✔
651
          scaledWeights[i] = (short) Math.max(weight, weightLowerBound);
1✔
652
        }
653
      }
654

655
      this.scaledWeights = scaledWeights;
1✔
656
      this.sequence = sequence;
1✔
657
    }
1✔
658

659
    // Without properly weighted channels, we do plain vanilla round_robin.
660
    boolean usesRoundRobin() {
661
      return usesRoundRobin;
1✔
662
    }
663

664
    /**
665
     * Returns the next sequence number and atomically increases sequence with wraparound.
666
     */
667
    private long nextSequence() {
668
      return Integer.toUnsignedLong(sequence.getAndIncrement());
1✔
669
    }
670

671
    /*
672
     * Selects index of next backend server.
673
     * <p>
674
     * A 2D array is compactly represented as a function of W(backend), where the row
675
     * represents the generation and the column represents the backend index:
676
     * X(backend,generation) | generation ∈ [0,kMaxWeight).
677
     * Each element in the conceptual array is a boolean indicating whether the backend at
678
     * this index should be picked now. If false, the counter is incremented again,
679
     * and the new element is checked. An atomically incremented counter keeps track of our
680
     * backend and generation through modular arithmetic within the pick() method.
681
     * <p>
682
     * Modular arithmetic allows us to evenly distribute picks and skips between
683
     * generations based on W(backend).
684
     * X(backend,generation) = (W(backend) * generation) % kMaxWeight >= kMaxWeight - W(backend)
685
     * If we have the same three backends with weights:
686
     * W(backend) = {2,3,6} scaled to max(W(backend)) = 6, then X(backend,generation) is:
687
     * <p>
688
     * B0    B1    B2
689
     * T     T     T
690
     * F     F     T
691
     * F     T     T
692
     * T     F     T
693
     * F     T     T
694
     * F     F     T
695
     * The sequence of picked backend indices is given by
696
     * walking across and down: {0,1,2,2,1,2,0,2,1,2,2}.
697
     * <p>
698
     * To reduce the variance and spread the wasted work among different picks,
699
     * an offset that varies per backend index is also included to the calculation.
700
     */
701
    int pick() {
702
      while (true) {
703
        long sequence = this.nextSequence();
1✔
704
        int backendIndex = (int) (sequence % scaledWeights.length);
1✔
705
        long generation = sequence / scaledWeights.length;
1✔
706
        int weight = Short.toUnsignedInt(scaledWeights[backendIndex]);
1✔
707
        long offset = (long) K_MAX_WEIGHT / 2 * backendIndex;
1✔
708
        if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
1✔
709
          continue;
1✔
710
        }
711
        return backendIndex;
1✔
712
      }
713
    }
714
  }
715

716
  static final class WeightedRoundRobinLoadBalancerConfig {
717
    final long blackoutPeriodNanos;
718
    final long weightExpirationPeriodNanos;
719
    final boolean enableOobLoadReport;
720
    final long oobReportingPeriodNanos;
721
    final long weightUpdatePeriodNanos;
722
    final float errorUtilizationPenalty;
723

724
    public static Builder newBuilder() {
725
      return new Builder();
1✔
726
    }
727

728
    private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
729
                                                 long weightExpirationPeriodNanos,
730
                                                 boolean enableOobLoadReport,
731
                                                 long oobReportingPeriodNanos,
732
                                                 long weightUpdatePeriodNanos,
733
                                                 float errorUtilizationPenalty) {
1✔
734
      this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
735
      this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
736
      this.enableOobLoadReport = enableOobLoadReport;
1✔
737
      this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
738
      this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
739
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
740
    }
1✔
741

742
    @Override
743
    public boolean equals(Object o) {
744
      if (!(o instanceof WeightedRoundRobinLoadBalancerConfig)) {
1✔
745
        return false;
1✔
746
      }
747
      WeightedRoundRobinLoadBalancerConfig that = (WeightedRoundRobinLoadBalancerConfig) o;
1✔
748
      return this.blackoutPeriodNanos == that.blackoutPeriodNanos
1✔
749
          && this.weightExpirationPeriodNanos == that.weightExpirationPeriodNanos
750
          && this.enableOobLoadReport == that.enableOobLoadReport
751
          && this.oobReportingPeriodNanos == that.oobReportingPeriodNanos
752
          && this.weightUpdatePeriodNanos == that.weightUpdatePeriodNanos
753
          // Float.compare considers NaNs equal
754
          && Float.compare(this.errorUtilizationPenalty, that.errorUtilizationPenalty) == 0;
1✔
755
    }
756

757
    @Override
758
    public int hashCode() {
759
      return Objects.hash(
1✔
760
          blackoutPeriodNanos,
1✔
761
          weightExpirationPeriodNanos,
1✔
762
          enableOobLoadReport,
1✔
763
          oobReportingPeriodNanos,
1✔
764
          weightUpdatePeriodNanos,
1✔
765
          errorUtilizationPenalty);
1✔
766
    }
767

768
    static final class Builder {
769
      long blackoutPeriodNanos = 10_000_000_000L; // 10s
1✔
770
      long weightExpirationPeriodNanos = 180_000_000_000L; //3min
1✔
771
      boolean enableOobLoadReport = false;
1✔
772
      long oobReportingPeriodNanos = 10_000_000_000L; // 10s
1✔
773
      long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
1✔
774
      float errorUtilizationPenalty = 1.0F;
1✔
775

776
      private Builder() {
1✔
777

778
      }
1✔
779

780
      @SuppressWarnings("UnusedReturnValue")
781
      Builder setBlackoutPeriodNanos(long blackoutPeriodNanos) {
782
        this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
783
        return this;
1✔
784
      }
785

786
      @SuppressWarnings("UnusedReturnValue")
787
      Builder setWeightExpirationPeriodNanos(long weightExpirationPeriodNanos) {
788
        this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
789
        return this;
1✔
790
      }
791

792
      Builder setEnableOobLoadReport(boolean enableOobLoadReport) {
793
        this.enableOobLoadReport = enableOobLoadReport;
1✔
794
        return this;
1✔
795
      }
796

797
      Builder setOobReportingPeriodNanos(long oobReportingPeriodNanos) {
798
        this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
799
        return this;
1✔
800
      }
801

802
      Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) {
803
        this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
804
        return this;
1✔
805
      }
806

807
      Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
808
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
809
        return this;
1✔
810
      }
811

812
      WeightedRoundRobinLoadBalancerConfig build() {
813
        return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
1✔
814
                weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
815
                weightUpdatePeriodNanos, errorUtilizationPenalty);
816
      }
817
    }
818
  }
819
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc