• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19785

21 Apr 2025 01:17PM UTC coverage: 88.591% (-0.008%) from 88.599%
#19785

push

github

web-flow
Implement grpc.lb.backend_service optional label

This completes gRFC A89. 7162d2d66 and fc86084df had already implemented
the LB plumbing for the optional label on RPC metrics. This observes the
value in OpenTelemetry and adds it to WRR metrics as well.

https://github.com/grpc/proposal/blob/master/A89-backend-service-metric-label.md

34747 of 39222 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.42
/../xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Preconditions;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.Lists;
27
import io.grpc.ConnectivityState;
28
import io.grpc.ConnectivityStateInfo;
29
import io.grpc.Deadline.Ticker;
30
import io.grpc.DoubleHistogramMetricInstrument;
31
import io.grpc.EquivalentAddressGroup;
32
import io.grpc.LoadBalancer;
33
import io.grpc.LoadBalancerProvider;
34
import io.grpc.LongCounterMetricInstrument;
35
import io.grpc.MetricInstrumentRegistry;
36
import io.grpc.NameResolver;
37
import io.grpc.Status;
38
import io.grpc.SynchronizationContext;
39
import io.grpc.SynchronizationContext.ScheduledHandle;
40
import io.grpc.services.MetricReport;
41
import io.grpc.util.ForwardingSubchannel;
42
import io.grpc.util.MultiChildLoadBalancer;
43
import io.grpc.xds.orca.OrcaOobUtil;
44
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
45
import io.grpc.xds.orca.OrcaPerRequestUtil;
46
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
47
import java.util.ArrayList;
48
import java.util.Collection;
49
import java.util.HashSet;
50
import java.util.List;
51
import java.util.Random;
52
import java.util.Set;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.TimeUnit;
55
import java.util.concurrent.atomic.AtomicInteger;
56
import java.util.logging.Level;
57
import java.util.logging.Logger;
58

59
/**
60
 * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over the
61
 * {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
62
 * determined by backend metrics using ORCA.
63
 * To use WRR, users may configure through channel serviceConfig. Example config:
64
 * <pre> {@code
65
 *       String wrrConfig = "{\"loadBalancingConfig\":" +
66
 *           "[{\"weighted_round_robin\":{\"enableOobLoadReport\":true, " +
67
 *           "\"blackoutPeriod\":\"10s\"," +
68
 *           "\"oobReportingPeriod\":\"10s\"," +
69
 *           "\"weightExpirationPeriod\":\"180s\"," +
70
 *           "\"errorUtilizationPenalty\":\"1.0\"," +
71
 *           "\"weightUpdatePeriod\":\"1s\"}}]}";
72
 *        serviceConfig = (Map<String, ?>) JsonParser.parse(wrrConfig);
73
 *        channel = ManagedChannelBuilder.forTarget("test:///lb.test.grpc.io")
74
 *            .defaultServiceConfig(serviceConfig)
75
 *            .build();
76
 *  }
77
 *  </pre>
78
 *  Users may also configure through xDS control plane via custom lb policy. But that is much more
79
 *  complex to set up. Example config:
80
 *  <pre>
81
 *  localityLbPolicies:
82
 *   - customPolicy:
83
 *       name: weighted_round_robin
84
 *       data: '{ "enableOobLoadReport": true }'
85
 *  </pre>
86
 *  See related documentation: https://cloud.google.com/service-mesh/legacy/load-balancing-apis/proxyless-configure-advanced-traffic-management#custom-lb-config
87
 */
88
final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
89

90
  private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
91
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
92
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;
93
  private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;
94
  private static final Logger log = Logger.getLogger(
1✔
95
      WeightedRoundRobinLoadBalancer.class.getName());
1✔
96
  private WeightedRoundRobinLoadBalancerConfig config;
97
  private final SynchronizationContext syncContext;
98
  private final ScheduledExecutorService timeService;
99
  private ScheduledHandle weightUpdateTimer;
100
  private final Runnable updateWeightTask;
101
  private final AtomicInteger sequence;
102
  private final long infTime;
103
  private final Ticker ticker;
104
  private String locality = "";
1✔
105
  private String backendService = "";
1✔
106
  private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
107

108
  // The metric instruments are only registered once and shared by all instances of this LB.
109
  static {
110
    MetricInstrumentRegistry metricInstrumentRegistry
111
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
112
    RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
113
        "grpc.lb.wrr.rr_fallback",
114
        "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints "
115
            + "with valid weight, which caused the WRR policy to fall back to RR behavior",
116
        "{update}",
117
        Lists.newArrayList("grpc.target"),
1✔
118
        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
1✔
119
        false);
120
    ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
121
        "grpc.lb.wrr.endpoint_weight_not_yet_usable",
122
        "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable "
123
            + "weight information",
124
        "{endpoint}",
125
        Lists.newArrayList("grpc.target"),
1✔
126
        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
1✔
127
        false);
128
    ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
129
        "grpc.lb.wrr.endpoint_weight_stale",
130
        "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is "
131
            + "older than the expiration period",
132
        "{endpoint}",
133
        Lists.newArrayList("grpc.target"),
1✔
134
        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
1✔
135
        false);
136
    ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
1✔
137
        "grpc.lb.wrr.endpoint_weights",
138
        "EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
139
        "{weight}",
140
        Lists.newArrayList(),
1✔
141
        Lists.newArrayList("grpc.target"),
1✔
142
        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
1✔
143
        false);
144
  }
1✔
145

146
  public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
147
    this(helper, ticker, new Random());
1✔
148
  }
1✔
149

150
  @VisibleForTesting
151
  WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
152
    super(OrcaOobUtil.newOrcaReportingHelper(helper));
1✔
153
    this.ticker = checkNotNull(ticker, "ticker");
1✔
154
    this.infTime = ticker.nanoTime() + Long.MAX_VALUE;
1✔
155
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
156
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
157
    this.updateWeightTask = new UpdateWeightTask();
1✔
158
    this.sequence = new AtomicInteger(random.nextInt());
1✔
159
    log.log(Level.FINE, "weighted_round_robin LB created");
1✔
160
  }
1✔
161

162
  @Override
163
  protected ChildLbState createChildLbState(Object key) {
164
    return new WeightedChildLbState(key, pickFirstLbProvider);
1✔
165
  }
166

167
  @Override
168
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
169
    if (resolvedAddresses.getLoadBalancingPolicyConfig() == null) {
1✔
170
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
171
              "NameResolver returned no WeightedRoundRobinLoadBalancerConfig. addrs="
172
                      + resolvedAddresses.getAddresses()
1✔
173
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
174
      handleNameResolutionError(unavailableStatus);
1✔
175
      return unavailableStatus;
1✔
176
    }
177
    String locality = resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME);
1✔
178
    if (locality != null) {
1✔
179
      this.locality = locality;
1✔
180
    } else {
181
      this.locality = "";
1✔
182
    }
183
    String backendService
1✔
184
        = resolvedAddresses.getAttributes().get(NameResolver.ATTR_BACKEND_SERVICE);
1✔
185
    if (backendService != null) {
1✔
186
      this.backendService = backendService;
1✔
187
    } else {
188
      this.backendService = "";
1✔
189
    }
190
    config =
1✔
191
            (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
192

193
    if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
1✔
194
      weightUpdateTimer.cancel();
1✔
195
    }
196
    updateWeightTask.run();
1✔
197

198
    Status status = super.acceptResolvedAddresses(resolvedAddresses);
1✔
199

200
    createAndApplyOrcaListeners();
1✔
201

202
    return status;
1✔
203
  }
204

205
  /**
206
   * Updates picker with the list of active subchannels (state == READY).
207
   */
208
  @Override
209
  protected void updateOverallBalancingState() {
210
    List<ChildLbState> activeList = getReadyChildren();
1✔
211
    if (activeList.isEmpty()) {
1✔
212
      // No READY subchannels
213

214
      // MultiChildLB will request connection immediately on subchannel IDLE.
215
      boolean isConnecting = false;
1✔
216
      for (ChildLbState childLbState : getChildLbStates()) {
1✔
217
        ConnectivityState state = childLbState.getCurrentState();
1✔
218
        if (state == ConnectivityState.CONNECTING || state == ConnectivityState.IDLE) {
1✔
219
          isConnecting = true;
1✔
220
          break;
1✔
221
        }
222
      }
1✔
223

224
      if (isConnecting) {
1✔
225
        updateBalancingState(
1✔
226
            ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
1✔
227
      } else {
228
        updateBalancingState(
1✔
229
            ConnectivityState.TRANSIENT_FAILURE, createReadyPicker(getChildLbStates()));
1✔
230
      }
231
    } else {
1✔
232
      updateBalancingState(ConnectivityState.READY, createReadyPicker(activeList));
1✔
233
    }
234
  }
1✔
235

236
  private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
237
    WeightedRoundRobinPicker picker = new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
1✔
238
        config.enableOobLoadReport, config.errorUtilizationPenalty, sequence);
239
    updateWeight(picker);
1✔
240
    return picker;
1✔
241
  }
242

243
  private void updateWeight(WeightedRoundRobinPicker picker) {
244
    Helper helper = getHelper();
1✔
245
    float[] newWeights = new float[picker.children.size()];
1✔
246
    AtomicInteger staleEndpoints = new AtomicInteger();
1✔
247
    AtomicInteger notYetUsableEndpoints = new AtomicInteger();
1✔
248
    for (int i = 0; i < picker.children.size(); i++) {
1✔
249
      double newWeight = ((WeightedChildLbState) picker.children.get(i)).getWeight(staleEndpoints,
1✔
250
          notYetUsableEndpoints);
251
      helper.getMetricRecorder()
1✔
252
          .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
1✔
253
              ImmutableList.of(helper.getChannelTarget()),
1✔
254
              ImmutableList.of(locality, backendService));
1✔
255
      newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
1✔
256
    }
257

258
    if (staleEndpoints.get() > 0) {
1✔
259
      helper.getMetricRecorder()
1✔
260
          .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
1✔
261
              ImmutableList.of(helper.getChannelTarget()),
1✔
262
              ImmutableList.of(locality, backendService));
1✔
263
    }
264
    if (notYetUsableEndpoints.get() > 0) {
1✔
265
      helper.getMetricRecorder()
1✔
266
          .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
1✔
267
              ImmutableList.of(helper.getChannelTarget()),
1✔
268
              ImmutableList.of(locality, backendService));
1✔
269
    }
270
    boolean weightsEffective = picker.updateWeight(newWeights);
1✔
271
    if (!weightsEffective) {
1✔
272
      helper.getMetricRecorder()
1✔
273
          .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
1✔
274
              ImmutableList.of(locality, backendService));
1✔
275
    }
276
  }
1✔
277

278
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
279
    if (state != currentConnectivityState || !picker.equals(currentPicker)) {
1✔
280
      getHelper().updateBalancingState(state, picker);
1✔
281
      currentConnectivityState = state;
1✔
282
      currentPicker = picker;
1✔
283
    }
284
  }
1✔
285

286
  @VisibleForTesting
287
  final class WeightedChildLbState extends ChildLbState {
288

289
    private final Set<WrrSubchannel> subchannels = new HashSet<>();
1✔
290
    private volatile long lastUpdated;
291
    private volatile long nonEmptySince;
292
    private volatile double weight = 0;
1✔
293

294
    private OrcaReportListener orcaReportListener;
295

296
    public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider) {
1✔
297
      super(key, policyProvider);
1✔
298
    }
1✔
299

300
    @Override
301
    protected ChildLbStateHelper createChildHelper() {
302
      return new WrrChildLbStateHelper();
1✔
303
    }
304

305
    private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) {
306
      if (config == null) {
1✔
307
        return 0;
×
308
      }
309
      long now = ticker.nanoTime();
1✔
310
      if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
1✔
311
        nonEmptySince = infTime;
1✔
312
        staleEndpoints.incrementAndGet();
1✔
313
        return 0;
1✔
314
      } else if (now - nonEmptySince < config.blackoutPeriodNanos
1✔
315
          && config.blackoutPeriodNanos > 0) {
1✔
316
        notYetUsableEndpoints.incrementAndGet();
1✔
317
        return 0;
1✔
318
      } else {
319
        return weight;
1✔
320
      }
321
    }
322

323
    public void addSubchannel(WrrSubchannel wrrSubchannel) {
324
      subchannels.add(wrrSubchannel);
1✔
325
    }
1✔
326

327
    public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty) {
328
      if (orcaReportListener != null
1✔
329
          && orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty) {
1✔
330
        return orcaReportListener;
1✔
331
      }
332
      orcaReportListener = new OrcaReportListener(errorUtilizationPenalty);
1✔
333
      return orcaReportListener;
1✔
334
    }
335

336
    public void removeSubchannel(WrrSubchannel wrrSubchannel) {
337
      subchannels.remove(wrrSubchannel);
1✔
338
    }
1✔
339

340
    final class WrrChildLbStateHelper extends ChildLbStateHelper {
1✔
341
      @Override
342
      public Subchannel createSubchannel(CreateSubchannelArgs args) {
343
        return new WrrSubchannel(super.createSubchannel(args), WeightedChildLbState.this);
1✔
344
      }
345

346
      @Override
347
      public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
348
        super.updateBalancingState(newState, newPicker);
1✔
349
        if (!resolvingAddresses && newState == ConnectivityState.IDLE) {
1✔
350
          getLb().requestConnection();
×
351
        }
352
      }
1✔
353
    }
354

355
    final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
356
      private final float errorUtilizationPenalty;
357

358
      OrcaReportListener(float errorUtilizationPenalty) {
1✔
359
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
360
      }
1✔
361

362
      @Override
363
      public void onLoadReport(MetricReport report) {
364
        double newWeight = 0;
1✔
365
        // Prefer application utilization and fallback to CPU utilization if unset.
366
        double utilization =
367
            report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
1✔
368
                : report.getCpuUtilization();
1✔
369
        if (utilization > 0 && report.getQps() > 0) {
1✔
370
          double penalty = 0;
1✔
371
          if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
1✔
372
            penalty = report.getEps() / report.getQps() * errorUtilizationPenalty;
1✔
373
          }
374
          newWeight = report.getQps() / (utilization + penalty);
1✔
375
        }
376
        if (newWeight == 0) {
1✔
377
          return;
1✔
378
        }
379
        if (nonEmptySince == infTime) {
1✔
380
          nonEmptySince = ticker.nanoTime();
1✔
381
        }
382
        lastUpdated = ticker.nanoTime();
1✔
383
        weight = newWeight;
1✔
384
      }
1✔
385
    }
386
  }
387

388
  private final class UpdateWeightTask implements Runnable {
1✔
389
    @Override
390
    public void run() {
391
      if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker) {
1✔
392
        updateWeight((WeightedRoundRobinPicker) currentPicker);
1✔
393
      }
394
      weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
1✔
395
          TimeUnit.NANOSECONDS, timeService);
1✔
396
    }
1✔
397
  }
398

399
  private void createAndApplyOrcaListeners() {
400
    for (ChildLbState child : getChildLbStates()) {
1✔
401
      WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
402
      for (WrrSubchannel weightedSubchannel : wChild.subchannels) {
1✔
403
        if (config.enableOobLoadReport) {
1✔
404
          OrcaOobUtil.setListener(weightedSubchannel,
1✔
405
              wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty),
1✔
406
              OrcaOobUtil.OrcaReportingConfig.newBuilder()
1✔
407
                  .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
1✔
408
                  .build());
1✔
409
        } else {
410
          OrcaOobUtil.setListener(weightedSubchannel, null, null);
1✔
411
        }
412
      }
1✔
413
    }
1✔
414
  }
1✔
415

416
  @Override
417
  public void shutdown() {
418
    if (weightUpdateTimer != null) {
1✔
419
      weightUpdateTimer.cancel();
1✔
420
    }
421
    super.shutdown();
1✔
422
  }
1✔
423

424
  @VisibleForTesting
425
  final class WrrSubchannel extends ForwardingSubchannel {
426
    private final Subchannel delegate;
427
    private final WeightedChildLbState owner;
428

429
    WrrSubchannel(Subchannel delegate, WeightedChildLbState owner) {
1✔
430
      this.delegate = checkNotNull(delegate, "delegate");
1✔
431
      this.owner = checkNotNull(owner, "owner");
1✔
432
    }
1✔
433

434
    @Override
435
    public void start(SubchannelStateListener listener) {
436
      owner.addSubchannel(this);
1✔
437
      delegate().start(new SubchannelStateListener() {
1✔
438
        @Override
439
        public void onSubchannelState(ConnectivityStateInfo newState) {
440
          if (newState.getState().equals(ConnectivityState.READY)) {
1✔
441
            owner.nonEmptySince = infTime;
1✔
442
          }
443
          listener.onSubchannelState(newState);
1✔
444
        }
1✔
445
      });
446
    }
1✔
447

448
    @Override
449
    protected Subchannel delegate() {
450
      return delegate;
1✔
451
    }
452

453
    @Override
454
    public void shutdown() {
455
      super.shutdown();
1✔
456
      owner.removeSubchannel(this);
1✔
457
    }
1✔
458
  }
459

460
  @VisibleForTesting
461
  static final class WeightedRoundRobinPicker extends SubchannelPicker {
462
    // Parallel lists (column-based storage instead of normal row-based storage of List<Struct>).
463
    // The ith element of children corresponds to the ith element of pickers, listeners, and even
464
    // updateWeight(float[]).
465
    private final List<ChildLbState> children; // May only be accessed from sync context
466
    private final List<SubchannelPicker> pickers;
467
    private final List<OrcaPerRequestReportListener> reportListeners;
468
    private final boolean enableOobLoadReport;
469
    private final float errorUtilizationPenalty;
470
    private final AtomicInteger sequence;
471
    private final int hashCode;
472
    private volatile StaticStrideScheduler scheduler;
473

474
    WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
475
        float errorUtilizationPenalty, AtomicInteger sequence) {
1✔
476
      checkNotNull(children, "children");
1✔
477
      Preconditions.checkArgument(!children.isEmpty(), "empty child list");
1✔
478
      this.children = children;
1✔
479
      List<SubchannelPicker> pickers = new ArrayList<>(children.size());
1✔
480
      List<OrcaPerRequestReportListener> reportListeners = new ArrayList<>(children.size());
1✔
481
      for (ChildLbState child : children) {
1✔
482
        WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
483
        pickers.add(wChild.getCurrentPicker());
1✔
484
        reportListeners.add(wChild.getOrCreateOrcaListener(errorUtilizationPenalty));
1✔
485
      }
1✔
486
      this.pickers = pickers;
1✔
487
      this.reportListeners = reportListeners;
1✔
488
      this.enableOobLoadReport = enableOobLoadReport;
1✔
489
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
490
      this.sequence = checkNotNull(sequence, "sequence");
1✔
491

492
      // For equality we treat pickers as a set; use hash code as defined by Set
493
      int sum = 0;
1✔
494
      for (SubchannelPicker picker : pickers) {
1✔
495
        sum += picker.hashCode();
1✔
496
      }
1✔
497
      this.hashCode = sum
1✔
498
          ^ Boolean.hashCode(enableOobLoadReport)
1✔
499
          ^ Float.hashCode(errorUtilizationPenalty);
1✔
500
    }
1✔
501

502
    @Override
503
    public PickResult pickSubchannel(PickSubchannelArgs args) {
504
      int pick = scheduler.pick();
1✔
505
      PickResult pickResult = pickers.get(pick).pickSubchannel(args);
1✔
506
      Subchannel subchannel = pickResult.getSubchannel();
1✔
507
      if (subchannel == null) {
1✔
508
        return pickResult;
1✔
509
      }
510
      if (!enableOobLoadReport) {
1✔
511
        return PickResult.withSubchannel(subchannel,
1✔
512
            OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
1✔
513
                reportListeners.get(pick)));
1✔
514
      } else {
515
        return PickResult.withSubchannel(subchannel);
1✔
516
      }
517
    }
518

519
    /** Returns {@code true} if weights are different than round_robin. */
520
    private boolean updateWeight(float[] newWeights) {
521
      this.scheduler = new StaticStrideScheduler(newWeights, sequence);
1✔
522
      return !this.scheduler.usesRoundRobin();
1✔
523
    }
524

525
    @Override
526
    public String toString() {
527
      return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
1✔
528
          .add("enableOobLoadReport", enableOobLoadReport)
1✔
529
          .add("errorUtilizationPenalty", errorUtilizationPenalty)
1✔
530
          .add("pickers", pickers)
1✔
531
          .toString();
1✔
532
    }
533

534
    @VisibleForTesting
535
    List<ChildLbState> getChildren() {
536
      return children;
1✔
537
    }
538

539
    @Override
540
    public int hashCode() {
541
      return hashCode;
×
542
    }
543

544
    @Override
545
    public boolean equals(Object o) {
546
      if (!(o instanceof WeightedRoundRobinPicker)) {
1✔
547
        return false;
×
548
      }
549
      WeightedRoundRobinPicker other = (WeightedRoundRobinPicker) o;
1✔
550
      if (other == this) {
1✔
551
        return true;
×
552
      }
553
      // the lists cannot contain duplicate subchannels
554
      return hashCode == other.hashCode
1✔
555
          && sequence == other.sequence
556
          && enableOobLoadReport == other.enableOobLoadReport
557
          && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
1✔
558
          && pickers.size() == other.pickers.size()
1✔
559
          && new HashSet<>(pickers).containsAll(other.pickers);
1✔
560
    }
561
  }
562

563
  /*
564
   * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
565
   * in which each object's deadline is the multiplicative inverse of the object's weight.
566
   * <p>
567
   * The way in which this is implemented is through a static stride scheduler. 
568
   * The Static Stride Scheduler works by iterating through the list of subchannel weights
569
   * and using modular arithmetic to proportionally distribute picks, favoring entries 
570
   * with higher weights. It is based on the observation that the intended sequence generated 
571
   * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic. 
572
   * The Static Stride Scheduler is more performant than other implementations of the EDF
573
   * Scheduler, as it removes the need for a priority queue (and thus mutex locks).
574
   * <p>
575
   * go/static-stride-scheduler
576
   * <p>
577
   *
578
   * <ul>
579
   *  <li>nextSequence() - O(1)
580
   *  <li>pick() - O(n)
581
   */
582
  @VisibleForTesting
583
  static final class StaticStrideScheduler {
584
    private final short[] scaledWeights;
585
    private final AtomicInteger sequence;
586
    private final boolean usesRoundRobin;
587
    private static final int K_MAX_WEIGHT = 0xFFFF;
588

589
    // Assuming the mean of all known weights is M, StaticStrideScheduler will clamp
590
    // weights bigger than M*kMaxRatio and weights smaller than M*kMinRatio.
591
    //
592
    // This is done as a performance optimization by limiting the number of rounds for picks
593
    // for edge cases where channels have large differences in subchannel weights.
594
    // In this case, without these clips, it would potentially require the scheduler to
595
    // frequently traverse through the entire subchannel list within the pick method.
596
    //
597
    // The current values of 10 and 0.1 were chosen without any experimenting. It should
598
    // decrease the amount of sequences that the scheduler must traverse through in order
599
    // to pick a high weight subchannel in such corner cases.
600
    // But, it also makes WeightedRoundRobin to send slightly more requests to
601
    // potentially very bad tasks (that would have near-zero weights) than zero.
602
    // This is not necessarily a downside, though. Perhaps this is not a problem at
603
    // all, and we can increase this value if needed to save CPU cycles.
604
    private static final double K_MAX_RATIO = 10;
605
    private static final double K_MIN_RATIO = 0.1;
606

607
    StaticStrideScheduler(float[] weights, AtomicInteger sequence) {
1✔
608
      checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
1✔
609
      int numChannels = weights.length;
1✔
610
      int numWeightedChannels = 0;
1✔
611
      double sumWeight = 0;
1✔
612
      double unscaledMeanWeight;
613
      float unscaledMaxWeight = 0;
1✔
614
      for (float weight : weights) {
1✔
615
        if (weight > 0) {
1✔
616
          sumWeight += weight;
1✔
617
          unscaledMaxWeight = Math.max(weight, unscaledMaxWeight);
1✔
618
          numWeightedChannels++;
1✔
619
        }
620
      }
621

622
      // Adjust max value s.t. ratio does not exceed K_MAX_RATIO. This should
623
      // ensure that we on average do at most K_MAX_RATIO rounds for picks.
624
      if (numWeightedChannels > 0) {
1✔
625
        unscaledMeanWeight = sumWeight / numWeightedChannels;
1✔
626
        unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));
1✔
627
      } else {
628
        // Fall back to round robin if all values are non-positives. Note that
629
        // numWeightedChannels == 1 also behaves like RR because the weights are all the same, but
630
        // the weights aren't 1, so it doesn't go through this path.
631
        unscaledMeanWeight = 1;
1✔
632
        unscaledMaxWeight = 1;
1✔
633
      }
634
      // We need at least two weights for WRR to be distinguishable from round_robin.
635
      usesRoundRobin = numWeightedChannels < 2;
1✔
636

637
      // Scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly.
638
      // Note that, since we cap the weights to stay within K_MAX_RATIO, meanWeight might not
639
      // match the actual mean of the values that end up in the scheduler.
640
      double scalingFactor = K_MAX_WEIGHT / unscaledMaxWeight;
1✔
641
      // We compute weightLowerBound and clamp it to 1 from below so that in the
642
      // worst case, we represent tiny weights as 1.
643
      int weightLowerBound = (int) Math.ceil(scalingFactor * unscaledMeanWeight * K_MIN_RATIO);
1✔
644
      short[] scaledWeights = new short[numChannels];
1✔
645
      for (int i = 0; i < numChannels; i++) {
1✔
646
        if (weights[i] <= 0) {
1✔
647
          scaledWeights[i] = (short) Math.round(scalingFactor * unscaledMeanWeight);
1✔
648
        } else {
649
          int weight = (int) Math.round(scalingFactor * Math.min(weights[i], unscaledMaxWeight));
1✔
650
          scaledWeights[i] = (short) Math.max(weight, weightLowerBound);
1✔
651
        }
652
      }
653

654
      this.scaledWeights = scaledWeights;
1✔
655
      this.sequence = sequence;
1✔
656
    }
1✔
657

658
    // Without properly weighted channels, we do plain vanilla round_robin.
659
    boolean usesRoundRobin() {
660
      return usesRoundRobin;
1✔
661
    }
662

663
    /**
664
     * Returns the next sequence number and atomically increases sequence with wraparound.
665
     */
666
    private long nextSequence() {
667
      return Integer.toUnsignedLong(sequence.getAndIncrement());
1✔
668
    }
669

670
    /*
671
     * Selects index of next backend server.
672
     * <p>
673
     * A 2D array is compactly represented as a function of W(backend), where the row
674
     * represents the generation and the column represents the backend index:
675
     * X(backend,generation) | generation ∈ [0,kMaxWeight).
676
     * Each element in the conceptual array is a boolean indicating whether the backend at
677
     * this index should be picked now. If false, the counter is incremented again,
678
     * and the new element is checked. An atomically incremented counter keeps track of our
679
     * backend and generation through modular arithmetic within the pick() method.
680
     * <p>
681
     * Modular arithmetic allows us to evenly distribute picks and skips between
682
     * generations based on W(backend).
683
     * X(backend,generation) = (W(backend) * generation) % kMaxWeight >= kMaxWeight - W(backend)
684
     * If we have the same three backends with weights:
685
     * W(backend) = {2,3,6} scaled to max(W(backend)) = 6, then X(backend,generation) is:
686
     * <p>
687
     * B0    B1    B2
688
     * T     T     T
689
     * F     F     T
690
     * F     T     T
691
     * T     F     T
692
     * F     T     T
693
     * F     F     T
694
     * The sequence of picked backend indices is given by
695
     * walking across and down: {0,1,2,2,1,2,0,2,1,2,2}.
696
     * <p>
697
     * To reduce the variance and spread the wasted work among different picks,
698
     * an offset that varies per backend index is also included to the calculation.
699
     */
700
    int pick() {
701
      while (true) {
702
        long sequence = this.nextSequence();
1✔
703
        int backendIndex = (int) (sequence % scaledWeights.length);
1✔
704
        long generation = sequence / scaledWeights.length;
1✔
705
        int weight = Short.toUnsignedInt(scaledWeights[backendIndex]);
1✔
706
        long offset = (long) K_MAX_WEIGHT / 2 * backendIndex;
1✔
707
        if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
1✔
708
          continue;
1✔
709
        }
710
        return backendIndex;
1✔
711
      }
712
    }
713
  }
714

715
  static final class WeightedRoundRobinLoadBalancerConfig {
716
    final long blackoutPeriodNanos;
717
    final long weightExpirationPeriodNanos;
718
    final boolean enableOobLoadReport;
719
    final long oobReportingPeriodNanos;
720
    final long weightUpdatePeriodNanos;
721
    final float errorUtilizationPenalty;
722

723
    public static Builder newBuilder() {
724
      return new Builder();
1✔
725
    }
726

727
    private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
728
                                                 long weightExpirationPeriodNanos,
729
                                                 boolean enableOobLoadReport,
730
                                                 long oobReportingPeriodNanos,
731
                                                 long weightUpdatePeriodNanos,
732
                                                 float errorUtilizationPenalty) {
1✔
733
      this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
734
      this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
735
      this.enableOobLoadReport = enableOobLoadReport;
1✔
736
      this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
737
      this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
738
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
739
    }
1✔
740

741
    static final class Builder {
742
      long blackoutPeriodNanos = 10_000_000_000L; // 10s
1✔
743
      long weightExpirationPeriodNanos = 180_000_000_000L; //3min
1✔
744
      boolean enableOobLoadReport = false;
1✔
745
      long oobReportingPeriodNanos = 10_000_000_000L; // 10s
1✔
746
      long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
1✔
747
      float errorUtilizationPenalty = 1.0F;
1✔
748

749
      private Builder() {
1✔
750

751
      }
1✔
752

753
      @SuppressWarnings("UnusedReturnValue")
754
      Builder setBlackoutPeriodNanos(long blackoutPeriodNanos) {
755
        this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
756
        return this;
1✔
757
      }
758

759
      @SuppressWarnings("UnusedReturnValue")
760
      Builder setWeightExpirationPeriodNanos(long weightExpirationPeriodNanos) {
761
        this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
762
        return this;
1✔
763
      }
764

765
      Builder setEnableOobLoadReport(boolean enableOobLoadReport) {
766
        this.enableOobLoadReport = enableOobLoadReport;
1✔
767
        return this;
1✔
768
      }
769

770
      Builder setOobReportingPeriodNanos(long oobReportingPeriodNanos) {
771
        this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
772
        return this;
1✔
773
      }
774

775
      Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) {
776
        this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
777
        return this;
1✔
778
      }
779

780
      Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
781
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
782
        return this;
1✔
783
      }
784

785
      WeightedRoundRobinLoadBalancerConfig build() {
786
        return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
1✔
787
                weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
788
                weightUpdatePeriodNanos, errorUtilizationPenalty);
789
      }
790
    }
791
  }
792
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc