• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19701

18 Feb 2025 03:33PM UTC coverage: 88.592% (-0.001%) from 88.593%
#19701

push

github

ejona86
util: Use acceptResolvedAddresses() for MultiChildLb children

A failing Status from acceptResolvedAddresses means something is wrong
with the config, but parts of the config may still have been applied.
Thus there are now two possible flows: errors that should prevent
updateOverallBalancingState() and errors that should have no effect
other than the return code. To manage that, MultChildLb must always be
responsible for calling updateOverallBalancingState().
acceptResolvedAddressesInternal() was inlined to make that error
processing easier. No existing usages actually needed to have logic
between updating the children and regenerating the picker.

RingHashLb already was verifying that the address list was not empty, so
the short-circuiting when acceptResolvedAddressesInternal() returned an
error was impossible to trigger. WrrLb's updateWeightTask() calls the
last picker, so it can run before acceptResolvedAddressesInternal(); the
only part that matters is re-creating the weightUpdateTimer.

34238 of 38647 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.37
/../xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Preconditions;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.Lists;
27
import io.grpc.ConnectivityState;
28
import io.grpc.ConnectivityStateInfo;
29
import io.grpc.Deadline.Ticker;
30
import io.grpc.DoubleHistogramMetricInstrument;
31
import io.grpc.EquivalentAddressGroup;
32
import io.grpc.LoadBalancer;
33
import io.grpc.LoadBalancerProvider;
34
import io.grpc.LongCounterMetricInstrument;
35
import io.grpc.MetricInstrumentRegistry;
36
import io.grpc.NameResolver;
37
import io.grpc.Status;
38
import io.grpc.SynchronizationContext;
39
import io.grpc.SynchronizationContext.ScheduledHandle;
40
import io.grpc.services.MetricReport;
41
import io.grpc.util.ForwardingSubchannel;
42
import io.grpc.util.MultiChildLoadBalancer;
43
import io.grpc.xds.orca.OrcaOobUtil;
44
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
45
import io.grpc.xds.orca.OrcaPerRequestUtil;
46
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
47
import java.util.ArrayList;
48
import java.util.Collection;
49
import java.util.HashSet;
50
import java.util.List;
51
import java.util.Random;
52
import java.util.Set;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.TimeUnit;
55
import java.util.concurrent.atomic.AtomicInteger;
56
import java.util.logging.Level;
57
import java.util.logging.Logger;
58

59
/**
60
 * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over the
61
 * {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
62
 * determined by backend metrics using ORCA.
63
 * To use WRR, users may configure through channel serviceConfig. Example config:
64
 * <pre> {@code
65
 *       String wrrConfig = "{\"loadBalancingConfig\":" +
66
 *           "[{\"weighted_round_robin\":{\"enableOobLoadReport\":true, " +
67
 *           "\"blackoutPeriod\":\"10s\"," +
68
 *           "\"oobReportingPeriod\":\"10s\"," +
69
 *           "\"weightExpirationPeriod\":\"180s\"," +
70
 *           "\"errorUtilizationPenalty\":\"1.0\"," +
71
 *           "\"weightUpdatePeriod\":\"1s\"}}]}";
72
 *        serviceConfig = (Map<String, ?>) JsonParser.parse(wrrConfig);
73
 *        channel = ManagedChannelBuilder.forTarget("test:///lb.test.grpc.io")
74
 *            .defaultServiceConfig(serviceConfig)
75
 *            .build();
76
 *  }
77
 *  </pre>
78
 *  Users may also configure through xDS control plane via custom lb policy. But that is much more
79
 *  complex to set up. Example config:
80
 *  <pre>
81
 *  localityLbPolicies:
82
 *   - customPolicy:
83
 *       name: weighted_round_robin
84
 *       data: '{ "enableOobLoadReport": true }'
85
 *  </pre>
86
 *  See related documentation: https://cloud.google.com/service-mesh/legacy/load-balancing-apis/proxyless-configure-advanced-traffic-management#custom-lb-config
87
 */
88
final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
89

90
  private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
91
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
92
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;
93
  private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;
94
  private static final Logger log = Logger.getLogger(
1✔
95
      WeightedRoundRobinLoadBalancer.class.getName());
1✔
96
  private WeightedRoundRobinLoadBalancerConfig config;
97
  private final SynchronizationContext syncContext;
98
  private final ScheduledExecutorService timeService;
99
  private ScheduledHandle weightUpdateTimer;
100
  private final Runnable updateWeightTask;
101
  private final AtomicInteger sequence;
102
  private final long infTime;
103
  private final Ticker ticker;
104
  private String locality = "";
1✔
105
  private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
106

107
  // The metric instruments are only registered once and shared by all instances of this LB.
108
  static {
109
    MetricInstrumentRegistry metricInstrumentRegistry
110
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
111
    RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
1✔
112
        "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints "
113
            + "with valid weight, which caused the WRR policy to fall back to RR behavior",
114
        "{update}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
1✔
115
        false);
116
    ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
117
        "grpc.lb.wrr.endpoint_weight_not_yet_usable", "EXPERIMENTAL. Number of endpoints "
118
            + "from each scheduler update that don't yet have usable weight information",
119
        "{endpoint}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
1✔
120
        false);
121
    ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
122
        "grpc.lb.wrr.endpoint_weight_stale",
123
        "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is "
124
            + "older than the expiration period", "{endpoint}", Lists.newArrayList("grpc.target"),
1✔
125
        Lists.newArrayList("grpc.lb.locality"), false);
1✔
126
    ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
1✔
127
        "grpc.lb.wrr.endpoint_weights",
128
        "EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
129
        "{weight}", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
1✔
130
        Lists.newArrayList("grpc.lb.locality"),
1✔
131
        false);
132
  }
1✔
133

134
  public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
135
    this(helper, ticker, new Random());
1✔
136
  }
1✔
137

138
  @VisibleForTesting
139
  WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
140
    super(OrcaOobUtil.newOrcaReportingHelper(helper));
1✔
141
    this.ticker = checkNotNull(ticker, "ticker");
1✔
142
    this.infTime = ticker.nanoTime() + Long.MAX_VALUE;
1✔
143
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
144
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
145
    this.updateWeightTask = new UpdateWeightTask();
1✔
146
    this.sequence = new AtomicInteger(random.nextInt());
1✔
147
    log.log(Level.FINE, "weighted_round_robin LB created");
1✔
148
  }
1✔
149

150
  @Override
151
  protected ChildLbState createChildLbState(Object key) {
152
    return new WeightedChildLbState(key, pickFirstLbProvider);
1✔
153
  }
154

155
  @Override
156
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
157
    if (resolvedAddresses.getLoadBalancingPolicyConfig() == null) {
1✔
158
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
159
              "NameResolver returned no WeightedRoundRobinLoadBalancerConfig. addrs="
160
                      + resolvedAddresses.getAddresses()
1✔
161
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
162
      handleNameResolutionError(unavailableStatus);
1✔
163
      return unavailableStatus;
1✔
164
    }
165
    String locality = resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME);
1✔
166
    if (locality != null) {
1✔
167
      this.locality = locality;
1✔
168
    } else {
169
      this.locality = "";
1✔
170
    }
171
    config =
1✔
172
            (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
173

174
    if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
1✔
175
      weightUpdateTimer.cancel();
1✔
176
    }
177
    updateWeightTask.run();
1✔
178

179
    Status status = super.acceptResolvedAddresses(resolvedAddresses);
1✔
180

181
    createAndApplyOrcaListeners();
1✔
182

183
    return status;
1✔
184
  }
185

186
  /**
187
   * Updates picker with the list of active subchannels (state == READY).
188
   */
189
  @Override
190
  protected void updateOverallBalancingState() {
191
    List<ChildLbState> activeList = getReadyChildren();
1✔
192
    if (activeList.isEmpty()) {
1✔
193
      // No READY subchannels
194

195
      // MultiChildLB will request connection immediately on subchannel IDLE.
196
      boolean isConnecting = false;
1✔
197
      for (ChildLbState childLbState : getChildLbStates()) {
1✔
198
        ConnectivityState state = childLbState.getCurrentState();
1✔
199
        if (state == ConnectivityState.CONNECTING || state == ConnectivityState.IDLE) {
1✔
200
          isConnecting = true;
1✔
201
          break;
1✔
202
        }
203
      }
1✔
204

205
      if (isConnecting) {
1✔
206
        updateBalancingState(
1✔
207
            ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
1✔
208
      } else {
209
        updateBalancingState(
1✔
210
            ConnectivityState.TRANSIENT_FAILURE, createReadyPicker(getChildLbStates()));
1✔
211
      }
212
    } else {
1✔
213
      updateBalancingState(ConnectivityState.READY, createReadyPicker(activeList));
1✔
214
    }
215
  }
1✔
216

217
  private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
218
    WeightedRoundRobinPicker picker = new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
1✔
219
        config.enableOobLoadReport, config.errorUtilizationPenalty, sequence);
220
    updateWeight(picker);
1✔
221
    return picker;
1✔
222
  }
223

224
  private void updateWeight(WeightedRoundRobinPicker picker) {
225
    Helper helper = getHelper();
1✔
226
    float[] newWeights = new float[picker.children.size()];
1✔
227
    AtomicInteger staleEndpoints = new AtomicInteger();
1✔
228
    AtomicInteger notYetUsableEndpoints = new AtomicInteger();
1✔
229
    for (int i = 0; i < picker.children.size(); i++) {
1✔
230
      double newWeight = ((WeightedChildLbState) picker.children.get(i)).getWeight(staleEndpoints,
1✔
231
          notYetUsableEndpoints);
232
      helper.getMetricRecorder()
1✔
233
          .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
1✔
234
              ImmutableList.of(helper.getChannelTarget()),
1✔
235
              ImmutableList.of(locality));
1✔
236
      newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
1✔
237
    }
238

239
    if (staleEndpoints.get() > 0) {
1✔
240
      helper.getMetricRecorder()
1✔
241
          .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
1✔
242
              ImmutableList.of(helper.getChannelTarget()),
1✔
243
              ImmutableList.of(locality));
1✔
244
    }
245
    if (notYetUsableEndpoints.get() > 0) {
1✔
246
      helper.getMetricRecorder()
1✔
247
          .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
1✔
248
              ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
1✔
249
    }
250
    boolean weightsEffective = picker.updateWeight(newWeights);
1✔
251
    if (!weightsEffective) {
1✔
252
      helper.getMetricRecorder()
1✔
253
          .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
1✔
254
              ImmutableList.of(locality));
1✔
255
    }
256
  }
1✔
257

258
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
259
    if (state != currentConnectivityState || !picker.equals(currentPicker)) {
1✔
260
      getHelper().updateBalancingState(state, picker);
1✔
261
      currentConnectivityState = state;
1✔
262
      currentPicker = picker;
1✔
263
    }
264
  }
1✔
265

266
  @VisibleForTesting
267
  final class WeightedChildLbState extends ChildLbState {
268

269
    private final Set<WrrSubchannel> subchannels = new HashSet<>();
1✔
270
    private volatile long lastUpdated;
271
    private volatile long nonEmptySince;
272
    private volatile double weight = 0;
1✔
273

274
    private OrcaReportListener orcaReportListener;
275

276
    public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider) {
1✔
277
      super(key, policyProvider);
1✔
278
    }
1✔
279

280
    @Override
281
    protected ChildLbStateHelper createChildHelper() {
282
      return new WrrChildLbStateHelper();
1✔
283
    }
284

285
    private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) {
286
      if (config == null) {
1✔
287
        return 0;
×
288
      }
289
      long now = ticker.nanoTime();
1✔
290
      if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
1✔
291
        nonEmptySince = infTime;
1✔
292
        staleEndpoints.incrementAndGet();
1✔
293
        return 0;
1✔
294
      } else if (now - nonEmptySince < config.blackoutPeriodNanos
1✔
295
          && config.blackoutPeriodNanos > 0) {
1✔
296
        notYetUsableEndpoints.incrementAndGet();
1✔
297
        return 0;
1✔
298
      } else {
299
        return weight;
1✔
300
      }
301
    }
302

303
    public void addSubchannel(WrrSubchannel wrrSubchannel) {
304
      subchannels.add(wrrSubchannel);
1✔
305
    }
1✔
306

307
    public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty) {
308
      if (orcaReportListener != null
1✔
309
          && orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty) {
1✔
310
        return orcaReportListener;
1✔
311
      }
312
      orcaReportListener = new OrcaReportListener(errorUtilizationPenalty);
1✔
313
      return orcaReportListener;
1✔
314
    }
315

316
    public void removeSubchannel(WrrSubchannel wrrSubchannel) {
317
      subchannels.remove(wrrSubchannel);
1✔
318
    }
1✔
319

320
    final class WrrChildLbStateHelper extends ChildLbStateHelper {
1✔
321
      @Override
322
      public Subchannel createSubchannel(CreateSubchannelArgs args) {
323
        return new WrrSubchannel(super.createSubchannel(args), WeightedChildLbState.this);
1✔
324
      }
325

326
      @Override
327
      public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
328
        super.updateBalancingState(newState, newPicker);
1✔
329
        if (!resolvingAddresses && newState == ConnectivityState.IDLE) {
1✔
330
          getLb().requestConnection();
×
331
        }
332
      }
1✔
333
    }
334

335
    final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
336
      private final float errorUtilizationPenalty;
337

338
      OrcaReportListener(float errorUtilizationPenalty) {
1✔
339
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
340
      }
1✔
341

342
      @Override
343
      public void onLoadReport(MetricReport report) {
344
        double newWeight = 0;
1✔
345
        // Prefer application utilization and fallback to CPU utilization if unset.
346
        double utilization =
347
            report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
1✔
348
                : report.getCpuUtilization();
1✔
349
        if (utilization > 0 && report.getQps() > 0) {
1✔
350
          double penalty = 0;
1✔
351
          if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
1✔
352
            penalty = report.getEps() / report.getQps() * errorUtilizationPenalty;
1✔
353
          }
354
          newWeight = report.getQps() / (utilization + penalty);
1✔
355
        }
356
        if (newWeight == 0) {
1✔
357
          return;
1✔
358
        }
359
        if (nonEmptySince == infTime) {
1✔
360
          nonEmptySince = ticker.nanoTime();
1✔
361
        }
362
        lastUpdated = ticker.nanoTime();
1✔
363
        weight = newWeight;
1✔
364
      }
1✔
365
    }
366
  }
367

368
  private final class UpdateWeightTask implements Runnable {
1✔
369
    @Override
370
    public void run() {
371
      if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker) {
1✔
372
        updateWeight((WeightedRoundRobinPicker) currentPicker);
1✔
373
      }
374
      weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
1✔
375
          TimeUnit.NANOSECONDS, timeService);
1✔
376
    }
1✔
377
  }
378

379
  private void createAndApplyOrcaListeners() {
380
    for (ChildLbState child : getChildLbStates()) {
1✔
381
      WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
382
      for (WrrSubchannel weightedSubchannel : wChild.subchannels) {
1✔
383
        if (config.enableOobLoadReport) {
1✔
384
          OrcaOobUtil.setListener(weightedSubchannel,
1✔
385
              wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty),
1✔
386
              OrcaOobUtil.OrcaReportingConfig.newBuilder()
1✔
387
                  .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
1✔
388
                  .build());
1✔
389
        } else {
390
          OrcaOobUtil.setListener(weightedSubchannel, null, null);
1✔
391
        }
392
      }
1✔
393
    }
1✔
394
  }
1✔
395

396
  @Override
397
  public void shutdown() {
398
    if (weightUpdateTimer != null) {
1✔
399
      weightUpdateTimer.cancel();
1✔
400
    }
401
    super.shutdown();
1✔
402
  }
1✔
403

404
  @VisibleForTesting
405
  final class WrrSubchannel extends ForwardingSubchannel {
406
    private final Subchannel delegate;
407
    private final WeightedChildLbState owner;
408

409
    WrrSubchannel(Subchannel delegate, WeightedChildLbState owner) {
1✔
410
      this.delegate = checkNotNull(delegate, "delegate");
1✔
411
      this.owner = checkNotNull(owner, "owner");
1✔
412
    }
1✔
413

414
    @Override
415
    public void start(SubchannelStateListener listener) {
416
      owner.addSubchannel(this);
1✔
417
      delegate().start(new SubchannelStateListener() {
1✔
418
        @Override
419
        public void onSubchannelState(ConnectivityStateInfo newState) {
420
          if (newState.getState().equals(ConnectivityState.READY)) {
1✔
421
            owner.nonEmptySince = infTime;
1✔
422
          }
423
          listener.onSubchannelState(newState);
1✔
424
        }
1✔
425
      });
426
    }
1✔
427

428
    @Override
429
    protected Subchannel delegate() {
430
      return delegate;
1✔
431
    }
432

433
    @Override
434
    public void shutdown() {
435
      super.shutdown();
1✔
436
      owner.removeSubchannel(this);
1✔
437
    }
1✔
438
  }
439

440
  @VisibleForTesting
441
  static final class WeightedRoundRobinPicker extends SubchannelPicker {
442
    // Parallel lists (column-based storage instead of normal row-based storage of List<Struct>).
443
    // The ith element of children corresponds to the ith element of pickers, listeners, and even
444
    // updateWeight(float[]).
445
    private final List<ChildLbState> children; // May only be accessed from sync context
446
    private final List<SubchannelPicker> pickers;
447
    private final List<OrcaPerRequestReportListener> reportListeners;
448
    private final boolean enableOobLoadReport;
449
    private final float errorUtilizationPenalty;
450
    private final AtomicInteger sequence;
451
    private final int hashCode;
452
    private volatile StaticStrideScheduler scheduler;
453

454
    WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
455
        float errorUtilizationPenalty, AtomicInteger sequence) {
1✔
456
      checkNotNull(children, "children");
1✔
457
      Preconditions.checkArgument(!children.isEmpty(), "empty child list");
1✔
458
      this.children = children;
1✔
459
      List<SubchannelPicker> pickers = new ArrayList<>(children.size());
1✔
460
      List<OrcaPerRequestReportListener> reportListeners = new ArrayList<>(children.size());
1✔
461
      for (ChildLbState child : children) {
1✔
462
        WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
463
        pickers.add(wChild.getCurrentPicker());
1✔
464
        reportListeners.add(wChild.getOrCreateOrcaListener(errorUtilizationPenalty));
1✔
465
      }
1✔
466
      this.pickers = pickers;
1✔
467
      this.reportListeners = reportListeners;
1✔
468
      this.enableOobLoadReport = enableOobLoadReport;
1✔
469
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
470
      this.sequence = checkNotNull(sequence, "sequence");
1✔
471

472
      // For equality we treat pickers as a set; use hash code as defined by Set
473
      int sum = 0;
1✔
474
      for (SubchannelPicker picker : pickers) {
1✔
475
        sum += picker.hashCode();
1✔
476
      }
1✔
477
      this.hashCode = sum
1✔
478
          ^ Boolean.hashCode(enableOobLoadReport)
1✔
479
          ^ Float.hashCode(errorUtilizationPenalty);
1✔
480
    }
1✔
481

482
    @Override
483
    public PickResult pickSubchannel(PickSubchannelArgs args) {
484
      int pick = scheduler.pick();
1✔
485
      PickResult pickResult = pickers.get(pick).pickSubchannel(args);
1✔
486
      Subchannel subchannel = pickResult.getSubchannel();
1✔
487
      if (subchannel == null) {
1✔
488
        return pickResult;
1✔
489
      }
490
      if (!enableOobLoadReport) {
1✔
491
        return PickResult.withSubchannel(subchannel,
1✔
492
            OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
1✔
493
                reportListeners.get(pick)));
1✔
494
      } else {
495
        return PickResult.withSubchannel(subchannel);
1✔
496
      }
497
    }
498

499
    /** Returns {@code true} if weights are different than round_robin. */
500
    private boolean updateWeight(float[] newWeights) {
501
      this.scheduler = new StaticStrideScheduler(newWeights, sequence);
1✔
502
      return !this.scheduler.usesRoundRobin();
1✔
503
    }
504

505
    @Override
506
    public String toString() {
507
      return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
1✔
508
          .add("enableOobLoadReport", enableOobLoadReport)
1✔
509
          .add("errorUtilizationPenalty", errorUtilizationPenalty)
1✔
510
          .add("pickers", pickers)
1✔
511
          .toString();
1✔
512
    }
513

514
    @VisibleForTesting
515
    List<ChildLbState> getChildren() {
516
      return children;
1✔
517
    }
518

519
    @Override
520
    public int hashCode() {
521
      return hashCode;
×
522
    }
523

524
    @Override
525
    public boolean equals(Object o) {
526
      if (!(o instanceof WeightedRoundRobinPicker)) {
1✔
527
        return false;
×
528
      }
529
      WeightedRoundRobinPicker other = (WeightedRoundRobinPicker) o;
1✔
530
      if (other == this) {
1✔
531
        return true;
×
532
      }
533
      // the lists cannot contain duplicate subchannels
534
      return hashCode == other.hashCode
1✔
535
          && sequence == other.sequence
536
          && enableOobLoadReport == other.enableOobLoadReport
537
          && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
1✔
538
          && pickers.size() == other.pickers.size()
1✔
539
          && new HashSet<>(pickers).containsAll(other.pickers);
1✔
540
    }
541
  }
542

543
  /*
544
   * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
545
   * in which each object's deadline is the multiplicative inverse of the object's weight.
546
   * <p>
547
   * The way in which this is implemented is through a static stride scheduler. 
548
   * The Static Stride Scheduler works by iterating through the list of subchannel weights
549
   * and using modular arithmetic to proportionally distribute picks, favoring entries 
550
   * with higher weights. It is based on the observation that the intended sequence generated 
551
   * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic. 
552
   * The Static Stride Scheduler is more performant than other implementations of the EDF
553
   * Scheduler, as it removes the need for a priority queue (and thus mutex locks).
554
   * <p>
555
   * go/static-stride-scheduler
556
   * <p>
557
   *
558
   * <ul>
559
   *  <li>nextSequence() - O(1)
560
   *  <li>pick() - O(n)
561
   */
562
  @VisibleForTesting
563
  static final class StaticStrideScheduler {
564
    private final short[] scaledWeights;
565
    private final AtomicInteger sequence;
566
    private final boolean usesRoundRobin;
567
    private static final int K_MAX_WEIGHT = 0xFFFF;
568

569
    // Assuming the mean of all known weights is M, StaticStrideScheduler will clamp
570
    // weights bigger than M*kMaxRatio and weights smaller than M*kMinRatio.
571
    //
572
    // This is done as a performance optimization by limiting the number of rounds for picks
573
    // for edge cases where channels have large differences in subchannel weights.
574
    // In this case, without these clips, it would potentially require the scheduler to
575
    // frequently traverse through the entire subchannel list within the pick method.
576
    //
577
    // The current values of 10 and 0.1 were chosen without any experimenting. It should
578
    // decrease the amount of sequences that the scheduler must traverse through in order
579
    // to pick a high weight subchannel in such corner cases.
580
    // But, it also makes WeightedRoundRobin to send slightly more requests to
581
    // potentially very bad tasks (that would have near-zero weights) than zero.
582
    // This is not necessarily a downside, though. Perhaps this is not a problem at
583
    // all, and we can increase this value if needed to save CPU cycles.
584
    private static final double K_MAX_RATIO = 10;
585
    private static final double K_MIN_RATIO = 0.1;
586

587
    StaticStrideScheduler(float[] weights, AtomicInteger sequence) {
1✔
588
      checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
1✔
589
      int numChannels = weights.length;
1✔
590
      int numWeightedChannels = 0;
1✔
591
      double sumWeight = 0;
1✔
592
      double unscaledMeanWeight;
593
      float unscaledMaxWeight = 0;
1✔
594
      for (float weight : weights) {
1✔
595
        if (weight > 0) {
1✔
596
          sumWeight += weight;
1✔
597
          unscaledMaxWeight = Math.max(weight, unscaledMaxWeight);
1✔
598
          numWeightedChannels++;
1✔
599
        }
600
      }
601

602
      // Adjust max value s.t. ratio does not exceed K_MAX_RATIO. This should
603
      // ensure that we on average do at most K_MAX_RATIO rounds for picks.
604
      if (numWeightedChannels > 0) {
1✔
605
        unscaledMeanWeight = sumWeight / numWeightedChannels;
1✔
606
        unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));
1✔
607
      } else {
608
        // Fall back to round robin if all values are non-positives. Note that
609
        // numWeightedChannels == 1 also behaves like RR because the weights are all the same, but
610
        // the weights aren't 1, so it doesn't go through this path.
611
        unscaledMeanWeight = 1;
1✔
612
        unscaledMaxWeight = 1;
1✔
613
      }
614
      // We need at least two weights for WRR to be distinguishable from round_robin.
615
      usesRoundRobin = numWeightedChannels < 2;
1✔
616

617
      // Scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly.
618
      // Note that, since we cap the weights to stay within K_MAX_RATIO, meanWeight might not
619
      // match the actual mean of the values that end up in the scheduler.
620
      double scalingFactor = K_MAX_WEIGHT / unscaledMaxWeight;
1✔
621
      // We compute weightLowerBound and clamp it to 1 from below so that in the
622
      // worst case, we represent tiny weights as 1.
623
      int weightLowerBound = (int) Math.ceil(scalingFactor * unscaledMeanWeight * K_MIN_RATIO);
1✔
624
      short[] scaledWeights = new short[numChannels];
1✔
625
      for (int i = 0; i < numChannels; i++) {
1✔
626
        if (weights[i] <= 0) {
1✔
627
          scaledWeights[i] = (short) Math.round(scalingFactor * unscaledMeanWeight);
1✔
628
        } else {
629
          int weight = (int) Math.round(scalingFactor * Math.min(weights[i], unscaledMaxWeight));
1✔
630
          scaledWeights[i] = (short) Math.max(weight, weightLowerBound);
1✔
631
        }
632
      }
633

634
      this.scaledWeights = scaledWeights;
1✔
635
      this.sequence = sequence;
1✔
636
    }
1✔
637

638
    // Without properly weighted channels, we do plain vanilla round_robin.
639
    boolean usesRoundRobin() {
640
      return usesRoundRobin;
1✔
641
    }
642

643
    /**
644
     * Returns the next sequence number and atomically increases sequence with wraparound.
645
     */
646
    private long nextSequence() {
647
      return Integer.toUnsignedLong(sequence.getAndIncrement());
1✔
648
    }
649

650
    /*
651
     * Selects index of next backend server.
652
     * <p>
653
     * A 2D array is compactly represented as a function of W(backend), where the row
654
     * represents the generation and the column represents the backend index:
655
     * X(backend,generation) | generation ∈ [0,kMaxWeight).
656
     * Each element in the conceptual array is a boolean indicating whether the backend at
657
     * this index should be picked now. If false, the counter is incremented again,
658
     * and the new element is checked. An atomically incremented counter keeps track of our
659
     * backend and generation through modular arithmetic within the pick() method.
660
     * <p>
661
     * Modular arithmetic allows us to evenly distribute picks and skips between
662
     * generations based on W(backend).
663
     * X(backend,generation) = (W(backend) * generation) % kMaxWeight >= kMaxWeight - W(backend)
664
     * If we have the same three backends with weights:
665
     * W(backend) = {2,3,6} scaled to max(W(backend)) = 6, then X(backend,generation) is:
666
     * <p>
667
     * B0    B1    B2
668
     * T     T     T
669
     * F     F     T
670
     * F     T     T
671
     * T     F     T
672
     * F     T     T
673
     * F     F     T
674
     * The sequence of picked backend indices is given by
675
     * walking across and down: {0,1,2,2,1,2,0,2,1,2,2}.
676
     * <p>
677
     * To reduce the variance and spread the wasted work among different picks,
678
     * an offset that varies per backend index is also included to the calculation.
679
     */
680
    int pick() {
681
      while (true) {
682
        long sequence = this.nextSequence();
1✔
683
        int backendIndex = (int) (sequence % scaledWeights.length);
1✔
684
        long generation = sequence / scaledWeights.length;
1✔
685
        int weight = Short.toUnsignedInt(scaledWeights[backendIndex]);
1✔
686
        long offset = (long) K_MAX_WEIGHT / 2 * backendIndex;
1✔
687
        if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
1✔
688
          continue;
1✔
689
        }
690
        return backendIndex;
1✔
691
      }
692
    }
693
  }
694

695
  static final class WeightedRoundRobinLoadBalancerConfig {
696
    final long blackoutPeriodNanos;
697
    final long weightExpirationPeriodNanos;
698
    final boolean enableOobLoadReport;
699
    final long oobReportingPeriodNanos;
700
    final long weightUpdatePeriodNanos;
701
    final float errorUtilizationPenalty;
702

703
    public static Builder newBuilder() {
704
      return new Builder();
1✔
705
    }
706

707
    private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
708
                                                 long weightExpirationPeriodNanos,
709
                                                 boolean enableOobLoadReport,
710
                                                 long oobReportingPeriodNanos,
711
                                                 long weightUpdatePeriodNanos,
712
                                                 float errorUtilizationPenalty) {
1✔
713
      this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
714
      this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
715
      this.enableOobLoadReport = enableOobLoadReport;
1✔
716
      this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
717
      this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
718
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
719
    }
1✔
720

721
    static final class Builder {
722
      long blackoutPeriodNanos = 10_000_000_000L; // 10s
1✔
723
      long weightExpirationPeriodNanos = 180_000_000_000L; //3min
1✔
724
      boolean enableOobLoadReport = false;
1✔
725
      long oobReportingPeriodNanos = 10_000_000_000L; // 10s
1✔
726
      long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
1✔
727
      float errorUtilizationPenalty = 1.0F;
1✔
728

729
      private Builder() {
1✔
730

731
      }
1✔
732

733
      @SuppressWarnings("UnusedReturnValue")
734
      Builder setBlackoutPeriodNanos(long blackoutPeriodNanos) {
735
        this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
736
        return this;
1✔
737
      }
738

739
      @SuppressWarnings("UnusedReturnValue")
740
      Builder setWeightExpirationPeriodNanos(long weightExpirationPeriodNanos) {
741
        this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
742
        return this;
1✔
743
      }
744

745
      Builder setEnableOobLoadReport(boolean enableOobLoadReport) {
746
        this.enableOobLoadReport = enableOobLoadReport;
1✔
747
        return this;
1✔
748
      }
749

750
      Builder setOobReportingPeriodNanos(long oobReportingPeriodNanos) {
751
        this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
752
        return this;
1✔
753
      }
754

755
      Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) {
756
        this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
757
        return this;
1✔
758
      }
759

760
      Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
761
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
762
        return this;
1✔
763
      }
764

765
      WeightedRoundRobinLoadBalancerConfig build() {
766
        return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
1✔
767
                weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
768
                weightUpdatePeriodNanos, errorUtilizationPenalty);
769
      }
770
    }
771
  }
772
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc