• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19383

31 Jul 2024 08:32PM UTC coverage: 84.448% (+0.001%) from 84.447%
#19383

push

github

ejona86
xds: Stop extending RR in WRR

They share very little code, and we really don't want RoundRobinLb to be
public and non-final. Originally, WRR was expected to share much more
code with RR, and even delegated to RR at times. The delegation was
removed in 111ff60e. After dca89b25, most of the sharing has been moved
out into general-purpose tools that can be used by any LB policy.

FixedResultPicker now has equals to makes it as a EmptyPicker
replacement. RoundRobinLb still uses EmptyPicker because fixing its
tests is a larger change. OutlierDetectionLbTest was changed because
FixedResultPicker is used by PickFirstLeafLb, and now RoundRobinLb can
squelch some of its updates for ready pickers.

33265 of 39391 relevant lines covered (84.45%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.43
/../xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkElementIndex;
21
import static com.google.common.base.Preconditions.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.Lists;
28
import io.grpc.ConnectivityState;
29
import io.grpc.ConnectivityStateInfo;
30
import io.grpc.Deadline.Ticker;
31
import io.grpc.DoubleHistogramMetricInstrument;
32
import io.grpc.EquivalentAddressGroup;
33
import io.grpc.ExperimentalApi;
34
import io.grpc.LoadBalancer;
35
import io.grpc.LoadBalancerProvider;
36
import io.grpc.LongCounterMetricInstrument;
37
import io.grpc.MetricInstrumentRegistry;
38
import io.grpc.NameResolver;
39
import io.grpc.Status;
40
import io.grpc.SynchronizationContext;
41
import io.grpc.SynchronizationContext.ScheduledHandle;
42
import io.grpc.services.MetricReport;
43
import io.grpc.util.ForwardingLoadBalancerHelper;
44
import io.grpc.util.ForwardingSubchannel;
45
import io.grpc.util.MultiChildLoadBalancer;
46
import io.grpc.xds.orca.OrcaOobUtil;
47
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
48
import io.grpc.xds.orca.OrcaPerRequestUtil;
49
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
50
import java.util.Collection;
51
import java.util.HashMap;
52
import java.util.HashSet;
53
import java.util.List;
54
import java.util.Map;
55
import java.util.Random;
56
import java.util.Set;
57
import java.util.concurrent.ScheduledExecutorService;
58
import java.util.concurrent.TimeUnit;
59
import java.util.concurrent.atomic.AtomicInteger;
60
import java.util.logging.Level;
61
import java.util.logging.Logger;
62

63
/**
64
 * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over the
65
 * {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
66
 * determined by backend metrics using ORCA.
67
 * To use WRR, users may configure through channel serviceConfig. Example config:
68
 * <pre> {@code
69
 *       String wrrConfig = "{\"loadBalancingConfig\":" +
70
 *           "[{\"weighted_round_robin\":{\"enableOobLoadReport\":true, " +
71
 *           "\"blackoutPeriod\":\"10s\"," +
72
 *           "\"oobReportingPeriod\":\"10s\"," +
73
 *           "\"weightExpirationPeriod\":\"180s\"," +
74
 *           "\"errorUtilizationPenalty\":\"1.0\"," +
75
 *           "\"weightUpdatePeriod\":\"1s\"}}]}";
76
 *        serviceConfig = (Map<String, ?>) JsonParser.parse(wrrConfig);
77
 *        channel = ManagedChannelBuilder.forTarget("test:///lb.test.grpc.io")
78
 *            .defaultServiceConfig(serviceConfig)
79
 *            .build();
80
 *  }
81
 *  </pre>
82
 *  Users may also configure through xDS control plane via custom lb policy. But that is much more
83
 *  complex to set up. Example config:
84
 *  <pre>
85
 *  localityLbPolicies:
86
 *   - customPolicy:
87
 *       name: weighted_round_robin
88
 *       data: '{ "enableOobLoadReport": true }'
89
 *  </pre>
90
 *  See related documentation: https://cloud.google.com/service-mesh/legacy/load-balancing-apis/proxyless-configure-advanced-traffic-management#custom-lb-config
91
 */
92
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9885")
93
final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
94

95
  private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
96
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
97
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;
98
  private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;
99
  private static final Logger log = Logger.getLogger(
1✔
100
      WeightedRoundRobinLoadBalancer.class.getName());
1✔
101
  private WeightedRoundRobinLoadBalancerConfig config;
102
  private final SynchronizationContext syncContext;
103
  private final ScheduledExecutorService timeService;
104
  private ScheduledHandle weightUpdateTimer;
105
  private final Runnable updateWeightTask;
106
  private final AtomicInteger sequence;
107
  private final long infTime;
108
  private final Ticker ticker;
109
  private String locality = "";
1✔
110
  private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
111

112
  // The metric instruments are only registered once and shared by all instances of this LB.
113
  static {
114
    MetricInstrumentRegistry metricInstrumentRegistry
115
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
116
    RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
1✔
117
        "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints "
118
            + "with valid weight, which caused the WRR policy to fall back to RR behavior",
119
        "{update}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
1✔
120
        false);
121
    ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
122
        "grpc.lb.wrr.endpoint_weight_not_yet_usable", "EXPERIMENTAL. Number of endpoints "
123
            + "from each scheduler update that don't yet have usable weight information",
124
        "{endpoint}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
1✔
125
        false);
126
    ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
127
        "grpc.lb.wrr.endpoint_weight_stale",
128
        "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is "
129
            + "older than the expiration period", "{endpoint}", Lists.newArrayList("grpc.target"),
1✔
130
        Lists.newArrayList("grpc.lb.locality"), false);
1✔
131
    ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
1✔
132
        "grpc.lb.wrr.endpoint_weights",
133
        "EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
134
        "{weight}", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
1✔
135
        Lists.newArrayList("grpc.lb.locality"),
1✔
136
        false);
137
  }
1✔
138

139
  public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
140
    this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random());
1✔
141
  }
1✔
142

143
  public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker, Random random) {
144
    super(helper);
1✔
145
    helper.setLoadBalancer(this);
1✔
146
    this.ticker = checkNotNull(ticker, "ticker");
1✔
147
    this.infTime = ticker.nanoTime() + Long.MAX_VALUE;
1✔
148
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
149
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
150
    this.updateWeightTask = new UpdateWeightTask();
1✔
151
    this.sequence = new AtomicInteger(random.nextInt());
1✔
152
    log.log(Level.FINE, "weighted_round_robin LB created");
1✔
153
  }
1✔
154

155
  @VisibleForTesting
156
  WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
157
    this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, random);
1✔
158
  }
1✔
159

160
  @Override
161
  protected ChildLbState createChildLbState(Object key, Object policyConfig,
162
      SubchannelPicker initialPicker, ResolvedAddresses unused) {
163
    ChildLbState childLbState = new WeightedChildLbState(key, pickFirstLbProvider, policyConfig,
1✔
164
        initialPicker);
165
    return childLbState;
1✔
166
  }
167

168
  @Override
169
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
170
    if (resolvedAddresses.getLoadBalancingPolicyConfig() == null) {
1✔
171
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
172
              "NameResolver returned no WeightedRoundRobinLoadBalancerConfig. addrs="
173
                      + resolvedAddresses.getAddresses()
1✔
174
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
175
      handleNameResolutionError(unavailableStatus);
1✔
176
      return unavailableStatus;
1✔
177
    }
178
    String locality = resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME);
1✔
179
    if (locality != null) {
1✔
180
      this.locality = locality;
1✔
181
    } else {
182
      this.locality = "";
1✔
183
    }
184
    config =
1✔
185
            (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
186
    AcceptResolvedAddrRetVal acceptRetVal;
187
    try {
188
      resolvingAddresses = true;
1✔
189
      acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
1✔
190
      if (!acceptRetVal.status.isOk()) {
1✔
191
        return acceptRetVal.status;
×
192
      }
193

194
      if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
1✔
195
        weightUpdateTimer.cancel();
1✔
196
      }
197
      updateWeightTask.run();
1✔
198

199
      createAndApplyOrcaListeners();
1✔
200

201
      // Must update channel picker before return so that new RPCs will not be routed to deleted
202
      // clusters and resolver can remove them in service config.
203
      updateOverallBalancingState();
1✔
204

205
      shutdownRemoved(acceptRetVal.removedChildren);
1✔
206
    } finally {
207
      resolvingAddresses = false;
1✔
208
    }
209

210
    return acceptRetVal.status;
1✔
211
  }
212

213
  /**
214
   * Updates picker with the list of active subchannels (state == READY).
215
   */
216
  @Override
217
  protected void updateOverallBalancingState() {
218
    List<ChildLbState> activeList = getReadyChildren();
1✔
219
    if (activeList.isEmpty()) {
1✔
220
      // No READY subchannels
221

222
      // MultiChildLB will request connection immediately on subchannel IDLE.
223
      boolean isConnecting = false;
1✔
224
      for (ChildLbState childLbState : getChildLbStates()) {
1✔
225
        ConnectivityState state = childLbState.getCurrentState();
1✔
226
        if (state == ConnectivityState.CONNECTING || state == ConnectivityState.IDLE) {
1✔
227
          isConnecting = true;
1✔
228
          break;
1✔
229
        }
230
      }
1✔
231

232
      if (isConnecting) {
1✔
233
        updateBalancingState(
1✔
234
            ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
1✔
235
      } else {
236
        updateBalancingState(
1✔
237
            ConnectivityState.TRANSIENT_FAILURE, createReadyPicker(getChildLbStates()));
1✔
238
      }
239
    } else {
1✔
240
      updateBalancingState(ConnectivityState.READY, createReadyPicker(activeList));
1✔
241
    }
242
  }
1✔
243

244
  private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
245
    return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
1✔
246
        config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper(),
1✔
247
        locality);
248
  }
249

250
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
251
    if (state != currentConnectivityState || !picker.equals(currentPicker)) {
1✔
252
      getHelper().updateBalancingState(state, picker);
1✔
253
      currentConnectivityState = state;
1✔
254
      currentPicker = picker;
1✔
255
    }
256
  }
1✔
257

258
  @VisibleForTesting
259
  final class WeightedChildLbState extends ChildLbState {
260

261
    private final Set<WrrSubchannel> subchannels = new HashSet<>();
1✔
262
    private volatile long lastUpdated;
263
    private volatile long nonEmptySince;
264
    private volatile double weight = 0;
1✔
265

266
    private OrcaReportListener orcaReportListener;
267

268
    public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider, Object childConfig,
269
        SubchannelPicker initialPicker) {
1✔
270
      super(key, policyProvider, childConfig, initialPicker);
1✔
271
    }
1✔
272

273
    private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) {
274
      if (config == null) {
1✔
275
        return 0;
×
276
      }
277
      long now = ticker.nanoTime();
1✔
278
      if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
1✔
279
        nonEmptySince = infTime;
1✔
280
        staleEndpoints.incrementAndGet();
1✔
281
        return 0;
1✔
282
      } else if (now - nonEmptySince < config.blackoutPeriodNanos
1✔
283
          && config.blackoutPeriodNanos > 0) {
1✔
284
        notYetUsableEndpoints.incrementAndGet();
1✔
285
        return 0;
1✔
286
      } else {
287
        return weight;
1✔
288
      }
289
    }
290

291
    public void addSubchannel(WrrSubchannel wrrSubchannel) {
292
      subchannels.add(wrrSubchannel);
1✔
293
    }
1✔
294

295
    public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty) {
296
      if (orcaReportListener != null
1✔
297
          && orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty) {
1✔
298
        return orcaReportListener;
1✔
299
      }
300
      orcaReportListener = new OrcaReportListener(errorUtilizationPenalty);
1✔
301
      return orcaReportListener;
1✔
302
    }
303

304
    public void removeSubchannel(WrrSubchannel wrrSubchannel) {
305
      subchannels.remove(wrrSubchannel);
1✔
306
    }
1✔
307

308
    final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
309
      private final float errorUtilizationPenalty;
310

311
      OrcaReportListener(float errorUtilizationPenalty) {
1✔
312
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
313
      }
1✔
314

315
      @Override
316
      public void onLoadReport(MetricReport report) {
317
        double newWeight = 0;
1✔
318
        // Prefer application utilization and fallback to CPU utilization if unset.
319
        double utilization =
320
            report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
1✔
321
                : report.getCpuUtilization();
1✔
322
        if (utilization > 0 && report.getQps() > 0) {
1✔
323
          double penalty = 0;
1✔
324
          if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
1✔
325
            penalty = report.getEps() / report.getQps() * errorUtilizationPenalty;
1✔
326
          }
327
          newWeight = report.getQps() / (utilization + penalty);
1✔
328
        }
329
        if (newWeight == 0) {
1✔
330
          return;
1✔
331
        }
332
        if (nonEmptySince == infTime) {
1✔
333
          nonEmptySince = ticker.nanoTime();
1✔
334
        }
335
        lastUpdated = ticker.nanoTime();
1✔
336
        weight = newWeight;
1✔
337
      }
1✔
338
    }
339
  }
340

341
  private final class UpdateWeightTask implements Runnable {
1✔
342
    @Override
343
    public void run() {
344
      if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker) {
1✔
345
        ((WeightedRoundRobinPicker) currentPicker).updateWeight();
1✔
346
      }
347
      weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
1✔
348
          TimeUnit.NANOSECONDS, timeService);
1✔
349
    }
1✔
350
  }
351

352
  private void createAndApplyOrcaListeners() {
353
    for (ChildLbState child : getChildLbStates()) {
1✔
354
      WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
355
      for (WrrSubchannel weightedSubchannel : wChild.subchannels) {
1✔
356
        if (config.enableOobLoadReport) {
1✔
357
          OrcaOobUtil.setListener(weightedSubchannel,
1✔
358
              wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty),
1✔
359
              OrcaOobUtil.OrcaReportingConfig.newBuilder()
1✔
360
                  .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
1✔
361
                  .build());
1✔
362
        } else {
363
          OrcaOobUtil.setListener(weightedSubchannel, null, null);
1✔
364
        }
365
      }
1✔
366
    }
1✔
367
  }
1✔
368

369
  @Override
370
  public void shutdown() {
371
    if (weightUpdateTimer != null) {
1✔
372
      weightUpdateTimer.cancel();
1✔
373
    }
374
    super.shutdown();
1✔
375
  }
1✔
376

377
  private static final class WrrHelper extends ForwardingLoadBalancerHelper {
378
    private final Helper delegate;
379
    private WeightedRoundRobinLoadBalancer wrr;
380

381
    WrrHelper(Helper helper) {
1✔
382
      this.delegate = helper;
1✔
383
    }
1✔
384

385
    void setLoadBalancer(WeightedRoundRobinLoadBalancer lb) {
386
      this.wrr = lb;
1✔
387
    }
1✔
388

389
    @Override
390
    protected Helper delegate() {
391
      return delegate;
1✔
392
    }
393

394
    @Override
395
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
396
      checkElementIndex(0, args.getAddresses().size(), "Empty address group");
1✔
397
      WeightedChildLbState childLbState =
1✔
398
          (WeightedChildLbState) wrr.getChildLbStateEag(args.getAddresses().get(0));
1✔
399
      return wrr.new WrrSubchannel(delegate().createSubchannel(args), childLbState);
1✔
400
    }
401
  }
402

403
  @VisibleForTesting
404
  final class WrrSubchannel extends ForwardingSubchannel {
405
    private final Subchannel delegate;
406
    private final WeightedChildLbState owner;
407

408
    WrrSubchannel(Subchannel delegate, WeightedChildLbState owner) {
1✔
409
      this.delegate = checkNotNull(delegate, "delegate");
1✔
410
      this.owner = checkNotNull(owner, "owner");
1✔
411
    }
1✔
412

413
    @Override
414
    public void start(SubchannelStateListener listener) {
415
      owner.addSubchannel(this);
1✔
416
      delegate().start(new SubchannelStateListener() {
1✔
417
        @Override
418
        public void onSubchannelState(ConnectivityStateInfo newState) {
419
          if (newState.getState().equals(ConnectivityState.READY)) {
1✔
420
            owner.nonEmptySince = infTime;
1✔
421
          }
422
          listener.onSubchannelState(newState);
1✔
423
        }
1✔
424
      });
425
    }
1✔
426

427
    @Override
428
    protected Subchannel delegate() {
429
      return delegate;
1✔
430
    }
431

432
    @Override
433
    public void shutdown() {
434
      super.shutdown();
1✔
435
      owner.removeSubchannel(this);
1✔
436
    }
1✔
437
  }
438

439
  @VisibleForTesting
440
  static final class WeightedRoundRobinPicker extends SubchannelPicker {
441
    private final List<ChildLbState> children;
442
    private final Map<Subchannel, OrcaPerRequestReportListener> subchannelToReportListenerMap =
1✔
443
        new HashMap<>();
444
    private final boolean enableOobLoadReport;
445
    private final float errorUtilizationPenalty;
446
    private final AtomicInteger sequence;
447
    private final int hashCode;
448
    private final LoadBalancer.Helper helper;
449
    private final String locality;
450
    private volatile StaticStrideScheduler scheduler;
451

452
    WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
453
        float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper,
454
        String locality) {
1✔
455
      checkNotNull(children, "children");
1✔
456
      Preconditions.checkArgument(!children.isEmpty(), "empty child list");
1✔
457
      this.children = children;
1✔
458
      for (ChildLbState child : children) {
1✔
459
        WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
460
        for (WrrSubchannel subchannel : wChild.subchannels) {
1✔
461
          this.subchannelToReportListenerMap
1✔
462
              .put(subchannel, wChild.getOrCreateOrcaListener(errorUtilizationPenalty));
1✔
463
        }
1✔
464
      }
1✔
465
      this.enableOobLoadReport = enableOobLoadReport;
1✔
466
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
467
      this.sequence = checkNotNull(sequence, "sequence");
1✔
468
      this.helper = helper;
1✔
469
      this.locality = checkNotNull(locality, "locality");
1✔
470

471
      // For equality we treat children as a set; use hash code as defined by Set
472
      int sum = 0;
1✔
473
      for (ChildLbState child : children) {
1✔
474
        sum += child.hashCode();
1✔
475
      }
1✔
476
      this.hashCode = sum
1✔
477
          ^ Boolean.hashCode(enableOobLoadReport)
1✔
478
          ^ Float.hashCode(errorUtilizationPenalty);
1✔
479

480
      updateWeight();
1✔
481
    }
1✔
482

483
    @Override
484
    public PickResult pickSubchannel(PickSubchannelArgs args) {
485
      ChildLbState childLbState = children.get(scheduler.pick());
1✔
486
      WeightedChildLbState wChild = (WeightedChildLbState) childLbState;
1✔
487
      PickResult pickResult = childLbState.getCurrentPicker().pickSubchannel(args);
1✔
488
      Subchannel subchannel = pickResult.getSubchannel();
1✔
489
      if (subchannel == null) {
1✔
490
        return pickResult;
1✔
491
      }
492
      if (!enableOobLoadReport) {
1✔
493
        return PickResult.withSubchannel(subchannel,
1✔
494
            OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
1✔
495
                subchannelToReportListenerMap.getOrDefault(subchannel,
1✔
496
                    wChild.getOrCreateOrcaListener(errorUtilizationPenalty))));
1✔
497
      } else {
498
        return PickResult.withSubchannel(subchannel);
1✔
499
      }
500
    }
501

502
    private void updateWeight() {
503
      float[] newWeights = new float[children.size()];
1✔
504
      AtomicInteger staleEndpoints = new AtomicInteger();
1✔
505
      AtomicInteger notYetUsableEndpoints = new AtomicInteger();
1✔
506
      for (int i = 0; i < children.size(); i++) {
1✔
507
        double newWeight = ((WeightedChildLbState) children.get(i)).getWeight(staleEndpoints,
1✔
508
            notYetUsableEndpoints);
509
        // TODO: add locality label once available
510
        helper.getMetricRecorder()
1✔
511
            .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
1✔
512
                ImmutableList.of(helper.getChannelTarget()),
1✔
513
                ImmutableList.of(locality));
1✔
514
        newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
1✔
515
      }
516
      if (staleEndpoints.get() > 0) {
1✔
517
        // TODO: add locality label once available
518
        helper.getMetricRecorder()
1✔
519
            .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
1✔
520
                ImmutableList.of(helper.getChannelTarget()),
1✔
521
                ImmutableList.of(locality));
1✔
522
      }
523
      if (notYetUsableEndpoints.get() > 0) {
1✔
524
        // TODO: add locality label once available
525
        helper.getMetricRecorder()
1✔
526
            .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
1✔
527
                ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
1✔
528
      }
529

530
      this.scheduler = new StaticStrideScheduler(newWeights, sequence);
1✔
531
      if (this.scheduler.usesRoundRobin()) {
1✔
532
        // TODO: locality label once available
533
        helper.getMetricRecorder()
1✔
534
            .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
1✔
535
                ImmutableList.of(locality));
1✔
536
      }
537
    }
1✔
538

539
    @Override
540
    public String toString() {
541
      return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
1✔
542
          .add("enableOobLoadReport", enableOobLoadReport)
1✔
543
          .add("errorUtilizationPenalty", errorUtilizationPenalty)
1✔
544
          .add("list", children).toString();
1✔
545
    }
546

547
    @VisibleForTesting
548
    List<ChildLbState> getChildren() {
549
      return children;
1✔
550
    }
551

552
    @Override
553
    public int hashCode() {
554
      return hashCode;
×
555
    }
556

557
    @Override
558
    public boolean equals(Object o) {
559
      if (!(o instanceof WeightedRoundRobinPicker)) {
1✔
560
        return false;
×
561
      }
562
      WeightedRoundRobinPicker other = (WeightedRoundRobinPicker) o;
1✔
563
      if (other == this) {
1✔
564
        return true;
×
565
      }
566
      // the lists cannot contain duplicate subchannels
567
      return hashCode == other.hashCode
1✔
568
          && sequence == other.sequence
569
          && enableOobLoadReport == other.enableOobLoadReport
570
          && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
1✔
571
          && children.size() == other.children.size()
1✔
572
          && new HashSet<>(children).containsAll(other.children);
1✔
573
    }
574
  }
575

576
  /*
577
   * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
578
   * in which each object's deadline is the multiplicative inverse of the object's weight.
579
   * <p>
580
   * The way in which this is implemented is through a static stride scheduler. 
581
   * The Static Stride Scheduler works by iterating through the list of subchannel weights
582
   * and using modular arithmetic to proportionally distribute picks, favoring entries 
583
   * with higher weights. It is based on the observation that the intended sequence generated 
584
   * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic. 
585
   * The Static Stride Scheduler is more performant than other implementations of the EDF
586
   * Scheduler, as it removes the need for a priority queue (and thus mutex locks).
587
   * <p>
588
   * go/static-stride-scheduler
589
   * <p>
590
   *
591
   * <ul>
592
   *  <li>nextSequence() - O(1)
593
   *  <li>pick() - O(n)
594
   */
595
  @VisibleForTesting
596
  static final class StaticStrideScheduler {
597
    private final short[] scaledWeights;
598
    private final AtomicInteger sequence;
599
    private final boolean usesRoundRobin;
600
    private static final int K_MAX_WEIGHT = 0xFFFF;
601

602
    // Assuming the mean of all known weights is M, StaticStrideScheduler will clamp
603
    // weights bigger than M*kMaxRatio and weights smaller than M*kMinRatio.
604
    //
605
    // This is done as a performance optimization by limiting the number of rounds for picks
606
    // for edge cases where channels have large differences in subchannel weights.
607
    // In this case, without these clips, it would potentially require the scheduler to
608
    // frequently traverse through the entire subchannel list within the pick method.
609
    //
610
    // The current values of 10 and 0.1 were chosen without any experimenting. It should
611
    // decrease the amount of sequences that the scheduler must traverse through in order
612
    // to pick a high weight subchannel in such corner cases.
613
    // But, it also makes WeightedRoundRobin to send slightly more requests to
614
    // potentially very bad tasks (that would have near-zero weights) than zero.
615
    // This is not necessarily a downside, though. Perhaps this is not a problem at
616
    // all, and we can increase this value if needed to save CPU cycles.
617
    private static final double K_MAX_RATIO = 10;
618
    private static final double K_MIN_RATIO = 0.1;
619

620
    StaticStrideScheduler(float[] weights, AtomicInteger sequence) {
1✔
621
      checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
1✔
622
      int numChannels = weights.length;
1✔
623
      int numWeightedChannels = 0;
1✔
624
      double sumWeight = 0;
1✔
625
      double unscaledMeanWeight;
626
      float unscaledMaxWeight = 0;
1✔
627
      for (float weight : weights) {
1✔
628
        if (weight > 0) {
1✔
629
          sumWeight += weight;
1✔
630
          unscaledMaxWeight = Math.max(weight, unscaledMaxWeight);
1✔
631
          numWeightedChannels++;
1✔
632
        }
633
      }
634

635
      // Adjust max value s.t. ratio does not exceed K_MAX_RATIO. This should
636
      // ensure that we on average do at most K_MAX_RATIO rounds for picks.
637
      if (numWeightedChannels > 0) {
1✔
638
        unscaledMeanWeight = sumWeight / numWeightedChannels;
1✔
639
        unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));
1✔
640
      } else {
641
        // Fall back to round robin if all values are non-positives. Note that
642
        // numWeightedChannels == 1 also behaves like RR because the weights are all the same, but
643
        // the weights aren't 1, so it doesn't go through this path.
644
        unscaledMeanWeight = 1;
1✔
645
        unscaledMaxWeight = 1;
1✔
646
      }
647
      // We need at least two weights for WRR to be distinguishable from round_robin.
648
      usesRoundRobin = numWeightedChannels < 2;
1✔
649

650
      // Scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly.
651
      // Note that, since we cap the weights to stay within K_MAX_RATIO, meanWeight might not
652
      // match the actual mean of the values that end up in the scheduler.
653
      double scalingFactor = K_MAX_WEIGHT / unscaledMaxWeight;
1✔
654
      // We compute weightLowerBound and clamp it to 1 from below so that in the
655
      // worst case, we represent tiny weights as 1.
656
      int weightLowerBound = (int) Math.ceil(scalingFactor * unscaledMeanWeight * K_MIN_RATIO);
1✔
657
      short[] scaledWeights = new short[numChannels];
1✔
658
      for (int i = 0; i < numChannels; i++) {
1✔
659
        if (weights[i] <= 0) {
1✔
660
          scaledWeights[i] = (short) Math.round(scalingFactor * unscaledMeanWeight);
1✔
661
        } else {
662
          int weight = (int) Math.round(scalingFactor * Math.min(weights[i], unscaledMaxWeight));
1✔
663
          scaledWeights[i] = (short) Math.max(weight, weightLowerBound);
1✔
664
        }
665
      }
666

667
      this.scaledWeights = scaledWeights;
1✔
668
      this.sequence = sequence;
1✔
669
    }
1✔
670

671
    // Without properly weighted channels, we do plain vanilla round_robin.
672
    boolean usesRoundRobin() {
673
      return usesRoundRobin;
1✔
674
    }
675

676
    /**
677
     * Returns the next sequence number and atomically increases sequence with wraparound.
678
     */
679
    private long nextSequence() {
680
      return Integer.toUnsignedLong(sequence.getAndIncrement());
1✔
681
    }
682

683
    /*
684
     * Selects index of next backend server.
685
     * <p>
686
     * A 2D array is compactly represented as a function of W(backend), where the row
687
     * represents the generation and the column represents the backend index:
688
     * X(backend,generation) | generation ∈ [0,kMaxWeight).
689
     * Each element in the conceptual array is a boolean indicating whether the backend at
690
     * this index should be picked now. If false, the counter is incremented again,
691
     * and the new element is checked. An atomically incremented counter keeps track of our
692
     * backend and generation through modular arithmetic within the pick() method.
693
     * <p>
694
     * Modular arithmetic allows us to evenly distribute picks and skips between
695
     * generations based on W(backend).
696
     * X(backend,generation) = (W(backend) * generation) % kMaxWeight >= kMaxWeight - W(backend)
697
     * If we have the same three backends with weights:
698
     * W(backend) = {2,3,6} scaled to max(W(backend)) = 6, then X(backend,generation) is:
699
     * <p>
700
     * B0    B1    B2
701
     * T     T     T
702
     * F     F     T
703
     * F     T     T
704
     * T     F     T
705
     * F     T     T
706
     * F     F     T
707
     * The sequence of picked backend indices is given by
708
     * walking across and down: {0,1,2,2,1,2,0,2,1,2,2}.
709
     * <p>
710
     * To reduce the variance and spread the wasted work among different picks,
711
     * an offset that varies per backend index is also included to the calculation.
712
     */
713
    int pick() {
714
      while (true) {
715
        long sequence = this.nextSequence();
1✔
716
        int backendIndex = (int) (sequence % scaledWeights.length);
1✔
717
        long generation = sequence / scaledWeights.length;
1✔
718
        int weight = Short.toUnsignedInt(scaledWeights[backendIndex]);
1✔
719
        long offset = (long) K_MAX_WEIGHT / 2 * backendIndex;
1✔
720
        if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
1✔
721
          continue;
1✔
722
        }
723
        return backendIndex;
1✔
724
      }
725
    }
726
  }
727

728
  static final class WeightedRoundRobinLoadBalancerConfig {
729
    final long blackoutPeriodNanos;
730
    final long weightExpirationPeriodNanos;
731
    final boolean enableOobLoadReport;
732
    final long oobReportingPeriodNanos;
733
    final long weightUpdatePeriodNanos;
734
    final float errorUtilizationPenalty;
735

736
    public static Builder newBuilder() {
737
      return new Builder();
1✔
738
    }
739

740
    private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
741
                                                 long weightExpirationPeriodNanos,
742
                                                 boolean enableOobLoadReport,
743
                                                 long oobReportingPeriodNanos,
744
                                                 long weightUpdatePeriodNanos,
745
                                                 float errorUtilizationPenalty) {
1✔
746
      this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
747
      this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
748
      this.enableOobLoadReport = enableOobLoadReport;
1✔
749
      this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
750
      this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
751
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
752
    }
1✔
753

754
    static final class Builder {
755
      long blackoutPeriodNanos = 10_000_000_000L; // 10s
1✔
756
      long weightExpirationPeriodNanos = 180_000_000_000L; //3min
1✔
757
      boolean enableOobLoadReport = false;
1✔
758
      long oobReportingPeriodNanos = 10_000_000_000L; // 10s
1✔
759
      long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
1✔
760
      float errorUtilizationPenalty = 1.0F;
1✔
761

762
      private Builder() {
1✔
763

764
      }
1✔
765

766
      @SuppressWarnings("UnusedReturnValue")
767
      Builder setBlackoutPeriodNanos(long blackoutPeriodNanos) {
768
        this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
769
        return this;
1✔
770
      }
771

772
      @SuppressWarnings("UnusedReturnValue")
773
      Builder setWeightExpirationPeriodNanos(long weightExpirationPeriodNanos) {
774
        this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
775
        return this;
1✔
776
      }
777

778
      Builder setEnableOobLoadReport(boolean enableOobLoadReport) {
779
        this.enableOobLoadReport = enableOobLoadReport;
1✔
780
        return this;
1✔
781
      }
782

783
      Builder setOobReportingPeriodNanos(long oobReportingPeriodNanos) {
784
        this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
785
        return this;
1✔
786
      }
787

788
      Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) {
789
        this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
790
        return this;
1✔
791
      }
792

793
      Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
794
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
795
        return this;
1✔
796
      }
797

798
      WeightedRoundRobinLoadBalancerConfig build() {
799
        return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
1✔
800
                weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
801
                weightUpdatePeriodNanos, errorUtilizationPenalty);
802
      }
803
    }
804
  }
805
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc