• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19440

29 Aug 2024 03:04PM UTC coverage: 84.491% (-0.002%) from 84.493%
#19440

push

github

ejona86
util: Remove child policy config from MultiChildLB state

The child policy config should be refreshed every address update, so it
shouldn't be stored in the ChildLbState. In addition, none of the
current usages actually used what was stored in the ChildLbState in a
meaningful way (it was always null).

ResolvedAddresses was also removed from createChildLbState(), as nothing
in it should be needed for creation; it varies over time and the values
passed at creation are immutable.

33402 of 39533 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.08
/../xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Preconditions;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.Lists;
27
import io.grpc.ConnectivityState;
28
import io.grpc.ConnectivityStateInfo;
29
import io.grpc.Deadline.Ticker;
30
import io.grpc.DoubleHistogramMetricInstrument;
31
import io.grpc.EquivalentAddressGroup;
32
import io.grpc.LoadBalancer;
33
import io.grpc.LoadBalancerProvider;
34
import io.grpc.LongCounterMetricInstrument;
35
import io.grpc.MetricInstrumentRegistry;
36
import io.grpc.NameResolver;
37
import io.grpc.Status;
38
import io.grpc.SynchronizationContext;
39
import io.grpc.SynchronizationContext.ScheduledHandle;
40
import io.grpc.services.MetricReport;
41
import io.grpc.util.ForwardingSubchannel;
42
import io.grpc.util.MultiChildLoadBalancer;
43
import io.grpc.xds.orca.OrcaOobUtil;
44
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
45
import io.grpc.xds.orca.OrcaPerRequestUtil;
46
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
47
import java.util.ArrayList;
48
import java.util.Collection;
49
import java.util.HashSet;
50
import java.util.List;
51
import java.util.Random;
52
import java.util.Set;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.TimeUnit;
55
import java.util.concurrent.atomic.AtomicInteger;
56
import java.util.logging.Level;
57
import java.util.logging.Logger;
58

59
/**
60
 * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over the
61
 * {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
62
 * determined by backend metrics using ORCA.
63
 * To use WRR, users may configure through channel serviceConfig. Example config:
64
 * <pre> {@code
65
 *       String wrrConfig = "{\"loadBalancingConfig\":" +
66
 *           "[{\"weighted_round_robin\":{\"enableOobLoadReport\":true, " +
67
 *           "\"blackoutPeriod\":\"10s\"," +
68
 *           "\"oobReportingPeriod\":\"10s\"," +
69
 *           "\"weightExpirationPeriod\":\"180s\"," +
70
 *           "\"errorUtilizationPenalty\":\"1.0\"," +
71
 *           "\"weightUpdatePeriod\":\"1s\"}}]}";
72
 *        serviceConfig = (Map<String, ?>) JsonParser.parse(wrrConfig);
73
 *        channel = ManagedChannelBuilder.forTarget("test:///lb.test.grpc.io")
74
 *            .defaultServiceConfig(serviceConfig)
75
 *            .build();
76
 *  }
77
 *  </pre>
78
 *  Users may also configure through xDS control plane via custom lb policy. But that is much more
79
 *  complex to set up. Example config:
80
 *  <pre>
81
 *  localityLbPolicies:
82
 *   - customPolicy:
83
 *       name: weighted_round_robin
84
 *       data: '{ "enableOobLoadReport": true }'
85
 *  </pre>
86
 *  See related documentation: https://cloud.google.com/service-mesh/legacy/load-balancing-apis/proxyless-configure-advanced-traffic-management#custom-lb-config
87
 */
88
final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
89

90
  private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
91
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
92
  private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;
93
  private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;
94
  private static final Logger log = Logger.getLogger(
1✔
95
      WeightedRoundRobinLoadBalancer.class.getName());
1✔
96
  private WeightedRoundRobinLoadBalancerConfig config;
97
  private final SynchronizationContext syncContext;
98
  private final ScheduledExecutorService timeService;
99
  private ScheduledHandle weightUpdateTimer;
100
  private final Runnable updateWeightTask;
101
  private final AtomicInteger sequence;
102
  private final long infTime;
103
  private final Ticker ticker;
104
  private String locality = "";
1✔
105
  private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
106

107
  // The metric instruments are only registered once and shared by all instances of this LB.
108
  static {
109
    MetricInstrumentRegistry metricInstrumentRegistry
110
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
111
    RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
1✔
112
        "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints "
113
            + "with valid weight, which caused the WRR policy to fall back to RR behavior",
114
        "{update}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
1✔
115
        false);
116
    ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
117
        "grpc.lb.wrr.endpoint_weight_not_yet_usable", "EXPERIMENTAL. Number of endpoints "
118
            + "from each scheduler update that don't yet have usable weight information",
119
        "{endpoint}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
1✔
120
        false);
121
    ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
122
        "grpc.lb.wrr.endpoint_weight_stale",
123
        "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is "
124
            + "older than the expiration period", "{endpoint}", Lists.newArrayList("grpc.target"),
1✔
125
        Lists.newArrayList("grpc.lb.locality"), false);
1✔
126
    ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
1✔
127
        "grpc.lb.wrr.endpoint_weights",
128
        "EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
129
        "{weight}", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
1✔
130
        Lists.newArrayList("grpc.lb.locality"),
1✔
131
        false);
132
  }
1✔
133

134
  public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
135
    this(helper, ticker, new Random());
1✔
136
  }
1✔
137

138
  @VisibleForTesting
139
  WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
140
    super(OrcaOobUtil.newOrcaReportingHelper(helper));
1✔
141
    this.ticker = checkNotNull(ticker, "ticker");
1✔
142
    this.infTime = ticker.nanoTime() + Long.MAX_VALUE;
1✔
143
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
144
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
145
    this.updateWeightTask = new UpdateWeightTask();
1✔
146
    this.sequence = new AtomicInteger(random.nextInt());
1✔
147
    log.log(Level.FINE, "weighted_round_robin LB created");
1✔
148
  }
1✔
149

150
  @Override
151
  protected ChildLbState createChildLbState(Object key) {
152
    return new WeightedChildLbState(key, pickFirstLbProvider);
1✔
153
  }
154

155
  @Override
156
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
157
    if (resolvedAddresses.getLoadBalancingPolicyConfig() == null) {
1✔
158
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
159
              "NameResolver returned no WeightedRoundRobinLoadBalancerConfig. addrs="
160
                      + resolvedAddresses.getAddresses()
1✔
161
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
162
      handleNameResolutionError(unavailableStatus);
1✔
163
      return unavailableStatus;
1✔
164
    }
165
    String locality = resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME);
1✔
166
    if (locality != null) {
1✔
167
      this.locality = locality;
1✔
168
    } else {
169
      this.locality = "";
1✔
170
    }
171
    config =
1✔
172
            (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
173
    AcceptResolvedAddrRetVal acceptRetVal;
174
    try {
175
      resolvingAddresses = true;
1✔
176
      acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
1✔
177
      if (!acceptRetVal.status.isOk()) {
1✔
178
        return acceptRetVal.status;
×
179
      }
180

181
      if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
1✔
182
        weightUpdateTimer.cancel();
1✔
183
      }
184
      updateWeightTask.run();
1✔
185

186
      createAndApplyOrcaListeners();
1✔
187

188
      // Must update channel picker before return so that new RPCs will not be routed to deleted
189
      // clusters and resolver can remove them in service config.
190
      updateOverallBalancingState();
1✔
191

192
      shutdownRemoved(acceptRetVal.removedChildren);
1✔
193
    } finally {
194
      resolvingAddresses = false;
1✔
195
    }
196

197
    return acceptRetVal.status;
1✔
198
  }
199

200
  /**
201
   * Updates picker with the list of active subchannels (state == READY).
202
   */
203
  @Override
204
  protected void updateOverallBalancingState() {
205
    List<ChildLbState> activeList = getReadyChildren();
1✔
206
    if (activeList.isEmpty()) {
1✔
207
      // No READY subchannels
208

209
      // MultiChildLB will request connection immediately on subchannel IDLE.
210
      boolean isConnecting = false;
1✔
211
      for (ChildLbState childLbState : getChildLbStates()) {
1✔
212
        ConnectivityState state = childLbState.getCurrentState();
1✔
213
        if (state == ConnectivityState.CONNECTING || state == ConnectivityState.IDLE) {
1✔
214
          isConnecting = true;
1✔
215
          break;
1✔
216
        }
217
      }
1✔
218

219
      if (isConnecting) {
1✔
220
        updateBalancingState(
1✔
221
            ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
1✔
222
      } else {
223
        updateBalancingState(
1✔
224
            ConnectivityState.TRANSIENT_FAILURE, createReadyPicker(getChildLbStates()));
1✔
225
      }
226
    } else {
1✔
227
      updateBalancingState(ConnectivityState.READY, createReadyPicker(activeList));
1✔
228
    }
229
  }
1✔
230

231
  private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
232
    WeightedRoundRobinPicker picker = new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
1✔
233
        config.enableOobLoadReport, config.errorUtilizationPenalty, sequence);
234
    updateWeight(picker);
1✔
235
    return picker;
1✔
236
  }
237

238
  private void updateWeight(WeightedRoundRobinPicker picker) {
239
    Helper helper = getHelper();
1✔
240
    float[] newWeights = new float[picker.children.size()];
1✔
241
    AtomicInteger staleEndpoints = new AtomicInteger();
1✔
242
    AtomicInteger notYetUsableEndpoints = new AtomicInteger();
1✔
243
    for (int i = 0; i < picker.children.size(); i++) {
1✔
244
      double newWeight = ((WeightedChildLbState) picker.children.get(i)).getWeight(staleEndpoints,
1✔
245
          notYetUsableEndpoints);
246
      helper.getMetricRecorder()
1✔
247
          .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
1✔
248
              ImmutableList.of(helper.getChannelTarget()),
1✔
249
              ImmutableList.of(locality));
1✔
250
      newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
1✔
251
    }
252

253
    if (staleEndpoints.get() > 0) {
1✔
254
      helper.getMetricRecorder()
1✔
255
          .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
1✔
256
              ImmutableList.of(helper.getChannelTarget()),
1✔
257
              ImmutableList.of(locality));
1✔
258
    }
259
    if (notYetUsableEndpoints.get() > 0) {
1✔
260
      helper.getMetricRecorder()
1✔
261
          .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
1✔
262
              ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
1✔
263
    }
264
    boolean weightsEffective = picker.updateWeight(newWeights);
1✔
265
    if (!weightsEffective) {
1✔
266
      helper.getMetricRecorder()
1✔
267
          .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
1✔
268
              ImmutableList.of(locality));
1✔
269
    }
270
  }
1✔
271

272
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
273
    if (state != currentConnectivityState || !picker.equals(currentPicker)) {
1✔
274
      getHelper().updateBalancingState(state, picker);
1✔
275
      currentConnectivityState = state;
1✔
276
      currentPicker = picker;
1✔
277
    }
278
  }
1✔
279

280
  @VisibleForTesting
281
  final class WeightedChildLbState extends ChildLbState {
282

283
    private final Set<WrrSubchannel> subchannels = new HashSet<>();
1✔
284
    private volatile long lastUpdated;
285
    private volatile long nonEmptySince;
286
    private volatile double weight = 0;
1✔
287

288
    private OrcaReportListener orcaReportListener;
289

290
    public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider) {
1✔
291
      super(key, policyProvider);
1✔
292
    }
1✔
293

294
    @Override
295
    protected ChildLbStateHelper createChildHelper() {
296
      return new WrrChildLbStateHelper();
1✔
297
    }
298

299
    private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) {
300
      if (config == null) {
1✔
301
        return 0;
×
302
      }
303
      long now = ticker.nanoTime();
1✔
304
      if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
1✔
305
        nonEmptySince = infTime;
1✔
306
        staleEndpoints.incrementAndGet();
1✔
307
        return 0;
1✔
308
      } else if (now - nonEmptySince < config.blackoutPeriodNanos
1✔
309
          && config.blackoutPeriodNanos > 0) {
1✔
310
        notYetUsableEndpoints.incrementAndGet();
1✔
311
        return 0;
1✔
312
      } else {
313
        return weight;
1✔
314
      }
315
    }
316

317
    public void addSubchannel(WrrSubchannel wrrSubchannel) {
318
      subchannels.add(wrrSubchannel);
1✔
319
    }
1✔
320

321
    public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty) {
322
      if (orcaReportListener != null
1✔
323
          && orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty) {
1✔
324
        return orcaReportListener;
1✔
325
      }
326
      orcaReportListener = new OrcaReportListener(errorUtilizationPenalty);
1✔
327
      return orcaReportListener;
1✔
328
    }
329

330
    public void removeSubchannel(WrrSubchannel wrrSubchannel) {
331
      subchannels.remove(wrrSubchannel);
1✔
332
    }
1✔
333

334
    final class WrrChildLbStateHelper extends ChildLbStateHelper {
1✔
335
      @Override
336
      public Subchannel createSubchannel(CreateSubchannelArgs args) {
337
        return new WrrSubchannel(super.createSubchannel(args), WeightedChildLbState.this);
1✔
338
      }
339

340
      @Override
341
      public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
342
        super.updateBalancingState(newState, newPicker);
1✔
343
        if (!resolvingAddresses && newState == ConnectivityState.IDLE) {
1✔
344
          getLb().requestConnection();
×
345
        }
346
      }
1✔
347
    }
348

349
    final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
350
      private final float errorUtilizationPenalty;
351

352
      OrcaReportListener(float errorUtilizationPenalty) {
1✔
353
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
354
      }
1✔
355

356
      @Override
357
      public void onLoadReport(MetricReport report) {
358
        double newWeight = 0;
1✔
359
        // Prefer application utilization and fallback to CPU utilization if unset.
360
        double utilization =
361
            report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
1✔
362
                : report.getCpuUtilization();
1✔
363
        if (utilization > 0 && report.getQps() > 0) {
1✔
364
          double penalty = 0;
1✔
365
          if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
1✔
366
            penalty = report.getEps() / report.getQps() * errorUtilizationPenalty;
1✔
367
          }
368
          newWeight = report.getQps() / (utilization + penalty);
1✔
369
        }
370
        if (newWeight == 0) {
1✔
371
          return;
1✔
372
        }
373
        if (nonEmptySince == infTime) {
1✔
374
          nonEmptySince = ticker.nanoTime();
1✔
375
        }
376
        lastUpdated = ticker.nanoTime();
1✔
377
        weight = newWeight;
1✔
378
      }
1✔
379
    }
380
  }
381

382
  private final class UpdateWeightTask implements Runnable {
1✔
383
    @Override
384
    public void run() {
385
      if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker) {
1✔
386
        updateWeight((WeightedRoundRobinPicker) currentPicker);
1✔
387
      }
388
      weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
1✔
389
          TimeUnit.NANOSECONDS, timeService);
1✔
390
    }
1✔
391
  }
392

393
  private void createAndApplyOrcaListeners() {
394
    for (ChildLbState child : getChildLbStates()) {
1✔
395
      WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
396
      for (WrrSubchannel weightedSubchannel : wChild.subchannels) {
1✔
397
        if (config.enableOobLoadReport) {
1✔
398
          OrcaOobUtil.setListener(weightedSubchannel,
1✔
399
              wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty),
1✔
400
              OrcaOobUtil.OrcaReportingConfig.newBuilder()
1✔
401
                  .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
1✔
402
                  .build());
1✔
403
        } else {
404
          OrcaOobUtil.setListener(weightedSubchannel, null, null);
1✔
405
        }
406
      }
1✔
407
    }
1✔
408
  }
1✔
409

410
  @Override
411
  public void shutdown() {
412
    if (weightUpdateTimer != null) {
1✔
413
      weightUpdateTimer.cancel();
1✔
414
    }
415
    super.shutdown();
1✔
416
  }
1✔
417

418
  @VisibleForTesting
419
  final class WrrSubchannel extends ForwardingSubchannel {
420
    private final Subchannel delegate;
421
    private final WeightedChildLbState owner;
422

423
    WrrSubchannel(Subchannel delegate, WeightedChildLbState owner) {
1✔
424
      this.delegate = checkNotNull(delegate, "delegate");
1✔
425
      this.owner = checkNotNull(owner, "owner");
1✔
426
    }
1✔
427

428
    @Override
429
    public void start(SubchannelStateListener listener) {
430
      owner.addSubchannel(this);
1✔
431
      delegate().start(new SubchannelStateListener() {
1✔
432
        @Override
433
        public void onSubchannelState(ConnectivityStateInfo newState) {
434
          if (newState.getState().equals(ConnectivityState.READY)) {
1✔
435
            owner.nonEmptySince = infTime;
1✔
436
          }
437
          listener.onSubchannelState(newState);
1✔
438
        }
1✔
439
      });
440
    }
1✔
441

442
    @Override
443
    protected Subchannel delegate() {
444
      return delegate;
1✔
445
    }
446

447
    @Override
448
    public void shutdown() {
449
      super.shutdown();
1✔
450
      owner.removeSubchannel(this);
1✔
451
    }
1✔
452
  }
453

454
  @VisibleForTesting
455
  static final class WeightedRoundRobinPicker extends SubchannelPicker {
456
    // Parallel lists (column-based storage instead of normal row-based storage of List<Struct>).
457
    // The ith element of children corresponds to the ith element of pickers, listeners, and even
458
    // updateWeight(float[]).
459
    private final List<ChildLbState> children; // May only be accessed from sync context
460
    private final List<SubchannelPicker> pickers;
461
    private final List<OrcaPerRequestReportListener> reportListeners;
462
    private final boolean enableOobLoadReport;
463
    private final float errorUtilizationPenalty;
464
    private final AtomicInteger sequence;
465
    private final int hashCode;
466
    private volatile StaticStrideScheduler scheduler;
467

468
    WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
469
        float errorUtilizationPenalty, AtomicInteger sequence) {
1✔
470
      checkNotNull(children, "children");
1✔
471
      Preconditions.checkArgument(!children.isEmpty(), "empty child list");
1✔
472
      this.children = children;
1✔
473
      List<SubchannelPicker> pickers = new ArrayList<>(children.size());
1✔
474
      List<OrcaPerRequestReportListener> reportListeners = new ArrayList<>(children.size());
1✔
475
      for (ChildLbState child : children) {
1✔
476
        WeightedChildLbState wChild = (WeightedChildLbState) child;
1✔
477
        pickers.add(wChild.getCurrentPicker());
1✔
478
        reportListeners.add(wChild.getOrCreateOrcaListener(errorUtilizationPenalty));
1✔
479
      }
1✔
480
      this.pickers = pickers;
1✔
481
      this.reportListeners = reportListeners;
1✔
482
      this.enableOobLoadReport = enableOobLoadReport;
1✔
483
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
484
      this.sequence = checkNotNull(sequence, "sequence");
1✔
485

486
      // For equality we treat pickers as a set; use hash code as defined by Set
487
      int sum = 0;
1✔
488
      for (SubchannelPicker picker : pickers) {
1✔
489
        sum += picker.hashCode();
1✔
490
      }
1✔
491
      this.hashCode = sum
1✔
492
          ^ Boolean.hashCode(enableOobLoadReport)
1✔
493
          ^ Float.hashCode(errorUtilizationPenalty);
1✔
494
    }
1✔
495

496
    @Override
497
    public PickResult pickSubchannel(PickSubchannelArgs args) {
498
      int pick = scheduler.pick();
1✔
499
      PickResult pickResult = pickers.get(pick).pickSubchannel(args);
1✔
500
      Subchannel subchannel = pickResult.getSubchannel();
1✔
501
      if (subchannel == null) {
1✔
502
        return pickResult;
1✔
503
      }
504
      if (!enableOobLoadReport) {
1✔
505
        return PickResult.withSubchannel(subchannel,
1✔
506
            OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
1✔
507
                reportListeners.get(pick)));
1✔
508
      } else {
509
        return PickResult.withSubchannel(subchannel);
1✔
510
      }
511
    }
512

513
    /** Returns {@code true} if weights are different than round_robin. */
514
    private boolean updateWeight(float[] newWeights) {
515
      this.scheduler = new StaticStrideScheduler(newWeights, sequence);
1✔
516
      return !this.scheduler.usesRoundRobin();
1✔
517
    }
518

519
    @Override
520
    public String toString() {
521
      return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
1✔
522
          .add("enableOobLoadReport", enableOobLoadReport)
1✔
523
          .add("errorUtilizationPenalty", errorUtilizationPenalty)
1✔
524
          .add("pickers", pickers)
1✔
525
          .toString();
1✔
526
    }
527

528
    @VisibleForTesting
529
    List<ChildLbState> getChildren() {
530
      return children;
1✔
531
    }
532

533
    @Override
534
    public int hashCode() {
535
      return hashCode;
×
536
    }
537

538
    @Override
539
    public boolean equals(Object o) {
540
      if (!(o instanceof WeightedRoundRobinPicker)) {
1✔
541
        return false;
×
542
      }
543
      WeightedRoundRobinPicker other = (WeightedRoundRobinPicker) o;
1✔
544
      if (other == this) {
1✔
545
        return true;
×
546
      }
547
      // the lists cannot contain duplicate subchannels
548
      return hashCode == other.hashCode
1✔
549
          && sequence == other.sequence
550
          && enableOobLoadReport == other.enableOobLoadReport
551
          && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
1✔
552
          && pickers.size() == other.pickers.size()
1✔
553
          && new HashSet<>(pickers).containsAll(other.pickers);
1✔
554
    }
555
  }
556

557
  /*
558
   * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
559
   * in which each object's deadline is the multiplicative inverse of the object's weight.
560
   * <p>
561
   * The way in which this is implemented is through a static stride scheduler. 
562
   * The Static Stride Scheduler works by iterating through the list of subchannel weights
563
   * and using modular arithmetic to proportionally distribute picks, favoring entries 
564
   * with higher weights. It is based on the observation that the intended sequence generated 
565
   * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic. 
566
   * The Static Stride Scheduler is more performant than other implementations of the EDF
567
   * Scheduler, as it removes the need for a priority queue (and thus mutex locks).
568
   * <p>
569
   * go/static-stride-scheduler
570
   * <p>
571
   *
572
   * <ul>
573
   *  <li>nextSequence() - O(1)
574
   *  <li>pick() - O(n)
575
   */
576
  @VisibleForTesting
577
  static final class StaticStrideScheduler {
578
    private final short[] scaledWeights;
579
    private final AtomicInteger sequence;
580
    private final boolean usesRoundRobin;
581
    private static final int K_MAX_WEIGHT = 0xFFFF;
582

583
    // Assuming the mean of all known weights is M, StaticStrideScheduler will clamp
584
    // weights bigger than M*kMaxRatio and weights smaller than M*kMinRatio.
585
    //
586
    // This is done as a performance optimization by limiting the number of rounds for picks
587
    // for edge cases where channels have large differences in subchannel weights.
588
    // In this case, without these clips, it would potentially require the scheduler to
589
    // frequently traverse through the entire subchannel list within the pick method.
590
    //
591
    // The current values of 10 and 0.1 were chosen without any experimenting. It should
592
    // decrease the amount of sequences that the scheduler must traverse through in order
593
    // to pick a high weight subchannel in such corner cases.
594
    // But, it also makes WeightedRoundRobin to send slightly more requests to
595
    // potentially very bad tasks (that would have near-zero weights) than zero.
596
    // This is not necessarily a downside, though. Perhaps this is not a problem at
597
    // all, and we can increase this value if needed to save CPU cycles.
598
    private static final double K_MAX_RATIO = 10;
599
    private static final double K_MIN_RATIO = 0.1;
600

601
    StaticStrideScheduler(float[] weights, AtomicInteger sequence) {
1✔
602
      checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
1✔
603
      int numChannels = weights.length;
1✔
604
      int numWeightedChannels = 0;
1✔
605
      double sumWeight = 0;
1✔
606
      double unscaledMeanWeight;
607
      float unscaledMaxWeight = 0;
1✔
608
      for (float weight : weights) {
1✔
609
        if (weight > 0) {
1✔
610
          sumWeight += weight;
1✔
611
          unscaledMaxWeight = Math.max(weight, unscaledMaxWeight);
1✔
612
          numWeightedChannels++;
1✔
613
        }
614
      }
615

616
      // Adjust max value s.t. ratio does not exceed K_MAX_RATIO. This should
617
      // ensure that we on average do at most K_MAX_RATIO rounds for picks.
618
      if (numWeightedChannels > 0) {
1✔
619
        unscaledMeanWeight = sumWeight / numWeightedChannels;
1✔
620
        unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));
1✔
621
      } else {
622
        // Fall back to round robin if all values are non-positives. Note that
623
        // numWeightedChannels == 1 also behaves like RR because the weights are all the same, but
624
        // the weights aren't 1, so it doesn't go through this path.
625
        unscaledMeanWeight = 1;
1✔
626
        unscaledMaxWeight = 1;
1✔
627
      }
628
      // We need at least two weights for WRR to be distinguishable from round_robin.
629
      usesRoundRobin = numWeightedChannels < 2;
1✔
630

631
      // Scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly.
632
      // Note that, since we cap the weights to stay within K_MAX_RATIO, meanWeight might not
633
      // match the actual mean of the values that end up in the scheduler.
634
      double scalingFactor = K_MAX_WEIGHT / unscaledMaxWeight;
1✔
635
      // We compute weightLowerBound and clamp it to 1 from below so that in the
636
      // worst case, we represent tiny weights as 1.
637
      int weightLowerBound = (int) Math.ceil(scalingFactor * unscaledMeanWeight * K_MIN_RATIO);
1✔
638
      short[] scaledWeights = new short[numChannels];
1✔
639
      for (int i = 0; i < numChannels; i++) {
1✔
640
        if (weights[i] <= 0) {
1✔
641
          scaledWeights[i] = (short) Math.round(scalingFactor * unscaledMeanWeight);
1✔
642
        } else {
643
          int weight = (int) Math.round(scalingFactor * Math.min(weights[i], unscaledMaxWeight));
1✔
644
          scaledWeights[i] = (short) Math.max(weight, weightLowerBound);
1✔
645
        }
646
      }
647

648
      this.scaledWeights = scaledWeights;
1✔
649
      this.sequence = sequence;
1✔
650
    }
1✔
651

652
    // Without properly weighted channels, we do plain vanilla round_robin.
653
    boolean usesRoundRobin() {
654
      return usesRoundRobin;
1✔
655
    }
656

657
    /**
658
     * Returns the next sequence number and atomically increases sequence with wraparound.
659
     */
660
    private long nextSequence() {
661
      return Integer.toUnsignedLong(sequence.getAndIncrement());
1✔
662
    }
663

664
    /*
665
     * Selects index of next backend server.
666
     * <p>
667
     * A 2D array is compactly represented as a function of W(backend), where the row
668
     * represents the generation and the column represents the backend index:
669
     * X(backend,generation) | generation ∈ [0,kMaxWeight).
670
     * Each element in the conceptual array is a boolean indicating whether the backend at
671
     * this index should be picked now. If false, the counter is incremented again,
672
     * and the new element is checked. An atomically incremented counter keeps track of our
673
     * backend and generation through modular arithmetic within the pick() method.
674
     * <p>
675
     * Modular arithmetic allows us to evenly distribute picks and skips between
676
     * generations based on W(backend).
677
     * X(backend,generation) = (W(backend) * generation) % kMaxWeight >= kMaxWeight - W(backend)
678
     * If we have the same three backends with weights:
679
     * W(backend) = {2,3,6} scaled to max(W(backend)) = 6, then X(backend,generation) is:
680
     * <p>
681
     * B0    B1    B2
682
     * T     T     T
683
     * F     F     T
684
     * F     T     T
685
     * T     F     T
686
     * F     T     T
687
     * F     F     T
688
     * The sequence of picked backend indices is given by
689
     * walking across and down: {0,1,2,2,1,2,0,2,1,2,2}.
690
     * <p>
691
     * To reduce the variance and spread the wasted work among different picks,
692
     * an offset that varies per backend index is also included to the calculation.
693
     */
694
    int pick() {
695
      while (true) {
696
        long sequence = this.nextSequence();
1✔
697
        int backendIndex = (int) (sequence % scaledWeights.length);
1✔
698
        long generation = sequence / scaledWeights.length;
1✔
699
        int weight = Short.toUnsignedInt(scaledWeights[backendIndex]);
1✔
700
        long offset = (long) K_MAX_WEIGHT / 2 * backendIndex;
1✔
701
        if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
1✔
702
          continue;
1✔
703
        }
704
        return backendIndex;
1✔
705
      }
706
    }
707
  }
708

709
  static final class WeightedRoundRobinLoadBalancerConfig {
710
    final long blackoutPeriodNanos;
711
    final long weightExpirationPeriodNanos;
712
    final boolean enableOobLoadReport;
713
    final long oobReportingPeriodNanos;
714
    final long weightUpdatePeriodNanos;
715
    final float errorUtilizationPenalty;
716

717
    public static Builder newBuilder() {
718
      return new Builder();
1✔
719
    }
720

721
    private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
722
                                                 long weightExpirationPeriodNanos,
723
                                                 boolean enableOobLoadReport,
724
                                                 long oobReportingPeriodNanos,
725
                                                 long weightUpdatePeriodNanos,
726
                                                 float errorUtilizationPenalty) {
1✔
727
      this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
728
      this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
729
      this.enableOobLoadReport = enableOobLoadReport;
1✔
730
      this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
731
      this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
732
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
733
    }
1✔
734

735
    static final class Builder {
736
      long blackoutPeriodNanos = 10_000_000_000L; // 10s
1✔
737
      long weightExpirationPeriodNanos = 180_000_000_000L; //3min
1✔
738
      boolean enableOobLoadReport = false;
1✔
739
      long oobReportingPeriodNanos = 10_000_000_000L; // 10s
1✔
740
      long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
1✔
741
      float errorUtilizationPenalty = 1.0F;
1✔
742

743
      private Builder() {
1✔
744

745
      }
1✔
746

747
      @SuppressWarnings("UnusedReturnValue")
748
      Builder setBlackoutPeriodNanos(long blackoutPeriodNanos) {
749
        this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
750
        return this;
1✔
751
      }
752

753
      @SuppressWarnings("UnusedReturnValue")
754
      Builder setWeightExpirationPeriodNanos(long weightExpirationPeriodNanos) {
755
        this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
756
        return this;
1✔
757
      }
758

759
      Builder setEnableOobLoadReport(boolean enableOobLoadReport) {
760
        this.enableOobLoadReport = enableOobLoadReport;
1✔
761
        return this;
1✔
762
      }
763

764
      Builder setOobReportingPeriodNanos(long oobReportingPeriodNanos) {
765
        this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
766
        return this;
1✔
767
      }
768

769
      Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) {
770
        this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
771
        return this;
1✔
772
      }
773

774
      Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
775
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
776
        return this;
1✔
777
      }
778

779
      WeightedRoundRobinLoadBalancerConfig build() {
780
        return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
1✔
781
                weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
782
                weightUpdatePeriodNanos, errorUtilizationPenalty);
783
      }
784
    }
785
  }
786
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc