• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18665

pending completion
#18665

push

github-actions

ejona86
services, xds, orca: use application_utilization and fallback to cpu_utilization if unset in WRR (#10256)

Implements updates to [A51][] and [A58][].

Imported cncf/xds using import.sh script.

A51: https://github.com/grpc/proposal/pull/374
A58: https://github.com/grpc/proposal/pull/373

30952 of 35074 relevant lines covered (88.25%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.47
/../xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Preconditions;
25
import io.grpc.ConnectivityState;
26
import io.grpc.ConnectivityStateInfo;
27
import io.grpc.Deadline.Ticker;
28
import io.grpc.EquivalentAddressGroup;
29
import io.grpc.ExperimentalApi;
30
import io.grpc.LoadBalancer;
31
import io.grpc.NameResolver;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.SynchronizationContext.ScheduledHandle;
35
import io.grpc.services.MetricReport;
36
import io.grpc.util.ForwardingLoadBalancerHelper;
37
import io.grpc.util.ForwardingSubchannel;
38
import io.grpc.util.RoundRobinLoadBalancer;
39
import io.grpc.xds.orca.OrcaOobUtil;
40
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
41
import io.grpc.xds.orca.OrcaPerRequestUtil;
42
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
43
import java.util.HashMap;
44
import java.util.HashSet;
45
import java.util.List;
46
import java.util.Map;
47
import java.util.PriorityQueue;
48
import java.util.Random;
49
import java.util.concurrent.ScheduledExecutorService;
50
import java.util.concurrent.TimeUnit;
51
import java.util.logging.Level;
52
import java.util.logging.Logger;
53

54
/**
55
 * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over
56
 * the {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
57
 * determined by backend metrics using ORCA.
58
 */
59
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9885")
60
final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
61
  private static final Logger log = Logger.getLogger(
1✔
62
      WeightedRoundRobinLoadBalancer.class.getName());
1✔
63
  private WeightedRoundRobinLoadBalancerConfig config;
64
  private final SynchronizationContext syncContext;
65
  private final ScheduledExecutorService timeService;
66
  private ScheduledHandle weightUpdateTimer;
67
  private final Runnable updateWeightTask;
68
  private final Random random;
69
  private final long infTime;
70
  private final Ticker ticker;
71

72
  public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
73
    this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random());
1✔
74
  }
1✔
75

76
  public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker, Random random) {
77
    super(helper);
1✔
78
    helper.setLoadBalancer(this);
1✔
79
    this.ticker = checkNotNull(ticker, "ticker");
1✔
80
    this.infTime = ticker.nanoTime() + Long.MAX_VALUE;
1✔
81
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
82
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
83
    this.updateWeightTask = new UpdateWeightTask();
1✔
84
    this.random = random;
1✔
85
    log.log(Level.FINE, "weighted_round_robin LB created");
1✔
86
  }
1✔
87

88
  @VisibleForTesting
89
  WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
90
    this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, random);
1✔
91
  }
1✔
92

93
  @Override
94
  public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
95
    if (resolvedAddresses.getLoadBalancingPolicyConfig() == null) {
1✔
96
      handleNameResolutionError(Status.UNAVAILABLE.withDescription(
1✔
97
              "NameResolver returned no WeightedRoundRobinLoadBalancerConfig. addrs="
98
                      + resolvedAddresses.getAddresses()
1✔
99
                      + ", attrs=" + resolvedAddresses.getAttributes()));
1✔
100
      return false;
1✔
101
    }
102
    config =
1✔
103
            (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
104
    boolean accepted = super.acceptResolvedAddresses(resolvedAddresses);
1✔
105
    if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
1✔
106
      weightUpdateTimer.cancel();
1✔
107
    }
108
    updateWeightTask.run();
1✔
109
    afterAcceptAddresses();
1✔
110
    return accepted;
1✔
111
  }
112

113
  @Override
114
  public RoundRobinPicker createReadyPicker(List<Subchannel> activeList) {
115
    return new WeightedRoundRobinPicker(activeList, config.enableOobLoadReport,
1✔
116
        config.errorUtilizationPenalty);
117
  }
118

119
  private final class UpdateWeightTask implements Runnable {
1✔
120
    @Override
121
    public void run() {
122
      if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker) {
1✔
123
        ((WeightedRoundRobinPicker)currentPicker).updateWeight();
1✔
124
      }
125
      weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
1✔
126
          TimeUnit.NANOSECONDS, timeService);
1✔
127
    }
1✔
128
  }
129

130
  private void afterAcceptAddresses() {
131
    for (Subchannel subchannel : getSubchannels()) {
1✔
132
      WrrSubchannel weightedSubchannel = (WrrSubchannel) subchannel;
1✔
133
      if (config.enableOobLoadReport) {
1✔
134
        OrcaOobUtil.setListener(weightedSubchannel,
1✔
135
            weightedSubchannel.new OrcaReportListener(config.errorUtilizationPenalty),
136
                OrcaOobUtil.OrcaReportingConfig.newBuilder()
1✔
137
                        .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
1✔
138
                        .build());
1✔
139
      } else {
140
        OrcaOobUtil.setListener(weightedSubchannel, null, null);
1✔
141
      }
142
    }
1✔
143
  }
1✔
144

145
  @Override
146
  public void shutdown() {
147
    if (weightUpdateTimer != null) {
1✔
148
      weightUpdateTimer.cancel();
1✔
149
    }
150
    super.shutdown();
1✔
151
  }
1✔
152

153
  private static final class WrrHelper extends ForwardingLoadBalancerHelper {
154
    private final Helper delegate;
155
    private WeightedRoundRobinLoadBalancer wrr;
156

157
    WrrHelper(Helper helper) {
1✔
158
      this.delegate = helper;
1✔
159
    }
1✔
160

161
    void setLoadBalancer(WeightedRoundRobinLoadBalancer lb) {
162
      this.wrr = lb;
1✔
163
    }
1✔
164

165
    @Override
166
    protected Helper delegate() {
167
      return delegate;
1✔
168
    }
169

170
    @Override
171
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
172
      return wrr.new WrrSubchannel(delegate().createSubchannel(args));
1✔
173
    }
174
  }
175

176
  @VisibleForTesting
177
  final class WrrSubchannel extends ForwardingSubchannel {
178
    private final Subchannel delegate;
179
    private volatile long lastUpdated;
180
    private volatile long nonEmptySince;
181
    private volatile double weight;
182

183
    WrrSubchannel(Subchannel delegate) {
1✔
184
      this.delegate = checkNotNull(delegate, "delegate");
1✔
185
    }
1✔
186

187
    @Override
188
    public void start(SubchannelStateListener listener) {
189
      delegate().start(new SubchannelStateListener() {
1✔
190
        @Override
191
        public void onSubchannelState(ConnectivityStateInfo newState) {
192
          if (newState.getState().equals(ConnectivityState.READY)) {
1✔
193
            nonEmptySince = infTime;
1✔
194
          }
195
          listener.onSubchannelState(newState);
1✔
196
        }
1✔
197
      });
198
    }
1✔
199

200
    private double getWeight() {
201
      if (config == null) {
1✔
202
        return 0;
×
203
      }
204
      long now = ticker.nanoTime();
1✔
205
      if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
1✔
206
        nonEmptySince = infTime;
1✔
207
        return 0;
1✔
208
      } else if (now - nonEmptySince < config.blackoutPeriodNanos
1✔
209
          && config.blackoutPeriodNanos > 0) {
1✔
210
        return 0;
1✔
211
      } else {
212
        return weight;
1✔
213
      }
214
    }
215

216
    @Override
217
    protected Subchannel delegate() {
218
      return delegate;
1✔
219
    }
220

221
    final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
222
      private final float errorUtilizationPenalty;
223

224
      OrcaReportListener(float errorUtilizationPenalty) {
1✔
225
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
226
      }
1✔
227

228
      @Override
229
      public void onLoadReport(MetricReport report) {
230
        double newWeight = 0;
1✔
231
        // Prefer application utilization and fallback to CPU utilization if unset.
232
        double utilization =
233
            report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
1✔
234
                : report.getCpuUtilization();
1✔
235
        if (utilization > 0 && report.getQps() > 0) {
1✔
236
          double penalty = 0;
1✔
237
          if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
1✔
238
            penalty = report.getEps() / report.getQps() * errorUtilizationPenalty;
1✔
239
          }
240
          newWeight = report.getQps() / (utilization + penalty);
1✔
241
        }
242
        if (newWeight == 0) {
1✔
243
          return;
1✔
244
        }
245
        if (nonEmptySince == infTime) {
1✔
246
          nonEmptySince = ticker.nanoTime();
1✔
247
        }
248
        lastUpdated = ticker.nanoTime();
1✔
249
        weight = newWeight;
1✔
250
      }
1✔
251
    }
252
  }
253

254
  @VisibleForTesting
255
  final class WeightedRoundRobinPicker extends RoundRobinPicker {
256
    private final List<Subchannel> list;
257
    private final Map<Subchannel, OrcaPerRequestReportListener> subchannelToReportListenerMap =
1✔
258
        new HashMap<>();
259
    private final boolean enableOobLoadReport;
260
    private final float errorUtilizationPenalty;
261
    private volatile EdfScheduler scheduler;
262

263
    WeightedRoundRobinPicker(List<Subchannel> list, boolean enableOobLoadReport,
264
        float errorUtilizationPenalty) {
1✔
265
      checkNotNull(list, "list");
1✔
266
      Preconditions.checkArgument(!list.isEmpty(), "empty list");
1✔
267
      this.list = list;
1✔
268
      for (Subchannel subchannel : list) {
1✔
269
        this.subchannelToReportListenerMap.put(subchannel,
1✔
270
            ((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty));
271
      }
1✔
272
      this.enableOobLoadReport = enableOobLoadReport;
1✔
273
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
274
      updateWeight();
1✔
275
    }
1✔
276

277
    @Override
278
    public PickResult pickSubchannel(PickSubchannelArgs args) {
279
      Subchannel subchannel = list.get(scheduler.pick());
1✔
280
      if (!enableOobLoadReport) {
1✔
281
        return PickResult.withSubchannel(subchannel,
1✔
282
            OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
1✔
283
                subchannelToReportListenerMap.getOrDefault(subchannel,
1✔
284
                    ((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty))));
285
      } else {
286
        return PickResult.withSubchannel(subchannel);
1✔
287
      }
288
    }
289

290
    private void updateWeight() {
291
      int weightedChannelCount = 0;
1✔
292
      double avgWeight = 0;
1✔
293
      for (Subchannel value : list) {
1✔
294
        double newWeight = ((WrrSubchannel) value).getWeight();
1✔
295
        if (newWeight > 0) {
1✔
296
          avgWeight += newWeight;
1✔
297
          weightedChannelCount++;
1✔
298
        }
299
      }
1✔
300
      EdfScheduler scheduler = new EdfScheduler(list.size(), random);
1✔
301
      if (weightedChannelCount >= 1) {
1✔
302
        avgWeight /= 1.0 * weightedChannelCount;
1✔
303
      } else {
304
        avgWeight = 1;
1✔
305
      }
306
      for (int i = 0; i < list.size(); i++) {
1✔
307
        WrrSubchannel subchannel = (WrrSubchannel) list.get(i);
1✔
308
        double newWeight = subchannel.getWeight();
1✔
309
        scheduler.add(i, newWeight > 0 ? newWeight : avgWeight);
1✔
310
      }
311
      this.scheduler = scheduler;
1✔
312
    }
1✔
313

314
    @Override
315
    public String toString() {
316
      return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
1✔
317
          .add("enableOobLoadReport", enableOobLoadReport)
1✔
318
          .add("errorUtilizationPenalty", errorUtilizationPenalty)
1✔
319
          .add("list", list).toString();
1✔
320
    }
321

322
    @VisibleForTesting
323
    List<Subchannel> getList() {
324
      return list;
1✔
325
    }
326

327
    @Override
328
    public boolean isEquivalentTo(RoundRobinPicker picker) {
329
      if (!(picker instanceof WeightedRoundRobinPicker)) {
1✔
330
        return false;
×
331
      }
332
      WeightedRoundRobinPicker other = (WeightedRoundRobinPicker) picker;
1✔
333
      if (other == this) {
1✔
334
        return true;
×
335
      }
336
      // the lists cannot contain duplicate subchannels
337
      return enableOobLoadReport == other.enableOobLoadReport
1✔
338
          && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
1✔
339
          && list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list);
1✔
340
    }
341
  }
342

343
  /**
344
   * The earliest deadline first implementation in which each object is
345
   * chosen deterministically and periodically with frequency proportional to its weight.
346
   *
347
   * <p>Specifically, each object added to chooser is given a deadline equal to the multiplicative
348
   * inverse of its weight. The place of each object in its deadline is tracked, and each call to
349
   * choose returns the object with the least remaining time in its deadline.
350
   * (Ties are broken by the order in which the children were added to the chooser.) The deadline
351
   * advances by the multiplicative inverse of the object's weight.
352
   * For example, if items A and B are added with weights 0.5 and 0.2, successive chooses return:
353
   *
354
   * <ul>
355
   *   <li>In the first call, the deadlines are A=2 (1/0.5) and B=5 (1/0.2), so A is returned.
356
   *   The deadline of A is updated to 4.
357
   *   <li>Next, the remaining deadlines are A=4 and B=5, so A is returned. The deadline of A (2) is
358
   *       updated to A=6.
359
   *   <li>Remaining deadlines are A=6 and B=5, so B is returned. The deadline of B is updated with
360
   *       with B=10.
361
   *   <li>Remaining deadlines are A=6 and B=10, so A is returned. The deadline of A is updated with
362
   *        A=8.
363
   *   <li>Remaining deadlines are A=8 and B=10, so A is returned. The deadline of A is updated with
364
   *       A=10.
365
   *   <li>Remaining deadlines are A=10 and B=10, so A is returned. The deadline of A is updated
366
   *      with A=12.
367
   *   <li>Remaining deadlines are A=12 and B=10, so B is returned. The deadline of B is updated
368
   *      with B=15.
369
   *   <li>etc.
370
   * </ul>
371
   *
372
   * <p>In short: the entry with the highest weight is preferred.
373
   *
374
   * <ul>
375
   *   <li>add() - O(lg n)
376
   *   <li>pick() - O(lg n)
377
   * </ul>
378
   *
379
   */
380
  @VisibleForTesting
381
  static final class EdfScheduler {
382
    private final PriorityQueue<ObjectState> prioQueue;
383

384
    /**
385
     * Weights below this value will be upped to this minimum weight.
386
     */
387
    private static final double MINIMUM_WEIGHT = 0.0001;
388

389
    private final Object lock = new Object();
1✔
390

391
    private final Random random;
392

393
    /**
394
     * Use the item's deadline as the order in the priority queue. If the deadlines are the same,
395
     * use the index. Index should be unique.
396
     */
397
    EdfScheduler(int initialCapacity, Random random) {
1✔
398
      this.prioQueue = new PriorityQueue<ObjectState>(initialCapacity, (o1, o2) -> {
1✔
399
        if (o1.deadline == o2.deadline) {
1✔
400
          return Integer.compare(o1.index, o2.index);
1✔
401
        } else {
402
          return Double.compare(o1.deadline, o2.deadline);
1✔
403
        }
404
      });
405
      this.random = random;
1✔
406
    }
1✔
407

408
    /**
409
     * Adds the item in the scheduler. This is not thread safe.
410
     *
411
     * @param index The field {@link ObjectState#index} to be added
412
     * @param weight positive weight for the added object
413
     */
414
    void add(int index, double weight) {
415
      checkArgument(weight > 0.0, "Weights need to be positive.");
1✔
416
      ObjectState state = new ObjectState(Math.max(weight, MINIMUM_WEIGHT), index);
1✔
417
      // Randomize the initial deadline.
418
      state.deadline = random.nextDouble() * (1 / state.weight);
1✔
419
      prioQueue.add(state);
1✔
420
    }
1✔
421

422
    /**
423
     * Picks the next WRR object.
424
     */
425
    int pick() {
426
      synchronized (lock) {
1✔
427
        ObjectState minObject = prioQueue.remove();
1✔
428
        minObject.deadline += 1.0 / minObject.weight;
1✔
429
        prioQueue.add(minObject);
1✔
430
        return minObject.index;
1✔
431
      }
432
    }
433
  }
434

435
  /** Holds the state of the object. */
436
  @VisibleForTesting
437
  static class ObjectState {
438
    private final double weight;
439
    private final int index;
440
    private volatile double deadline;
441

442
    ObjectState(double weight, int index) {
1✔
443
      this.weight = weight;
1✔
444
      this.index = index;
1✔
445
    }
1✔
446
  }
447

448
  static final class WeightedRoundRobinLoadBalancerConfig {
449
    final long blackoutPeriodNanos;
450
    final long weightExpirationPeriodNanos;
451
    final boolean enableOobLoadReport;
452
    final long oobReportingPeriodNanos;
453
    final long weightUpdatePeriodNanos;
454
    final float errorUtilizationPenalty;
455

456
    public static Builder newBuilder() {
457
      return new Builder();
1✔
458
    }
459

460
    private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
461
                                                 long weightExpirationPeriodNanos,
462
                                                 boolean enableOobLoadReport,
463
                                                 long oobReportingPeriodNanos,
464
                                                 long weightUpdatePeriodNanos,
465
                                                 float errorUtilizationPenalty) {
1✔
466
      this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
467
      this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
468
      this.enableOobLoadReport = enableOobLoadReport;
1✔
469
      this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
470
      this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
471
      this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
472
    }
1✔
473

474
    static final class Builder {
475
      long blackoutPeriodNanos = 10_000_000_000L; // 10s
1✔
476
      long weightExpirationPeriodNanos = 180_000_000_000L; //3min
1✔
477
      boolean enableOobLoadReport = false;
1✔
478
      long oobReportingPeriodNanos = 10_000_000_000L; // 10s
1✔
479
      long weightUpdatePeriodNanos = 1_000_000_000L; // 1s
1✔
480
      float errorUtilizationPenalty = 1.0F;
1✔
481

482
      private Builder() {
1✔
483

484
      }
1✔
485

486
      Builder setBlackoutPeriodNanos(long blackoutPeriodNanos) {
487
        this.blackoutPeriodNanos = blackoutPeriodNanos;
1✔
488
        return this;
1✔
489
      }
490

491
      Builder setWeightExpirationPeriodNanos(long weightExpirationPeriodNanos) {
492
        this.weightExpirationPeriodNanos = weightExpirationPeriodNanos;
1✔
493
        return this;
1✔
494
      }
495

496
      Builder setEnableOobLoadReport(boolean enableOobLoadReport) {
497
        this.enableOobLoadReport = enableOobLoadReport;
1✔
498
        return this;
1✔
499
      }
500

501
      Builder setOobReportingPeriodNanos(long oobReportingPeriodNanos) {
502
        this.oobReportingPeriodNanos = oobReportingPeriodNanos;
1✔
503
        return this;
1✔
504
      }
505

506
      Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) {
507
        this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
1✔
508
        return this;
1✔
509
      }
510

511
      Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
512
        this.errorUtilizationPenalty = errorUtilizationPenalty;
1✔
513
        return this;
1✔
514
      }
515

516
      WeightedRoundRobinLoadBalancerConfig build() {
517
        return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos,
1✔
518
                weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos,
519
                weightUpdatePeriodNanos, errorUtilizationPenalty);
520
      }
521
    }
522
  }
523
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc