• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20160

28 Jan 2026 12:21AM UTC coverage: 88.71% (+0.04%) from 88.666%
#20160

push

github

ejona86
xds: Normalize weights before combining endpoint and locality weights

Previously, the number of endpoints in a locality would skew how much
traffic was sent to that locality. Also, if endpoints in localities had
wildly different weights, that would impact cross-locality weighting.

For example, consider:
  LocalityA weight=1 endpointWeights=[100, 100, 100, 100]
  LocalityB weight=1 endpointWeights=[1]

The endpoint in LocalityB should have an endpoint weight that is half
the total sum of endpoint weights, in order to receive half the traffic.
But the multiple endpoints in LocalityA would cause it to get 4x the
traffic and the endpoint weights in LocalityA causes them to get 100x
the traffic.

See gRFC A113

35415 of 39922 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.88
/../xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21
import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME;
22
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
23

24
import com.google.common.collect.ImmutableMap;
25
import com.google.common.primitives.UnsignedInts;
26
import com.google.errorprone.annotations.CheckReturnValue;
27
import io.grpc.Attributes;
28
import io.grpc.EquivalentAddressGroup;
29
import io.grpc.HttpConnectProxiedSocketAddress;
30
import io.grpc.InternalLogId;
31
import io.grpc.LoadBalancer;
32
import io.grpc.LoadBalancerProvider;
33
import io.grpc.LoadBalancerRegistry;
34
import io.grpc.NameResolver;
35
import io.grpc.Status;
36
import io.grpc.StatusOr;
37
import io.grpc.internal.GrpcUtil;
38
import io.grpc.util.GracefulSwitchLoadBalancer;
39
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
40
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
41
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
42
import io.grpc.xds.Endpoints.DropOverload;
43
import io.grpc.xds.Endpoints.LbEndpoint;
44
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
45
import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection;
46
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
47
import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection;
48
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
49
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
50
import io.grpc.xds.XdsClusterResource.CdsUpdate;
51
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
52
import io.grpc.xds.XdsConfig.Subscription;
53
import io.grpc.xds.XdsConfig.XdsClusterConfig;
54
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
55
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
56
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
57
import io.grpc.xds.client.Locality;
58
import io.grpc.xds.client.XdsLogger;
59
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
60
import io.grpc.xds.internal.XdsInternalAttributes;
61
import java.net.InetSocketAddress;
62
import java.net.SocketAddress;
63
import java.util.ArrayList;
64
import java.util.Arrays;
65
import java.util.Collections;
66
import java.util.HashMap;
67
import java.util.HashSet;
68
import java.util.List;
69
import java.util.Map;
70
import java.util.Set;
71
import java.util.TreeMap;
72

73
/**
74
 * Load balancer for cds_experimental LB policy. One instance per top-level cluster.
75
 * The top-level cluster may be a plain EDS/logical-DNS cluster or an aggregate cluster formed
76
 * by a group of sub-clusters in a tree hierarchy.
77
 */
78
final class CdsLoadBalancer2 extends LoadBalancer {
79
  static boolean pickFirstWeightedShuffling =
1✔
80
      GrpcUtil.getFlag("GRPC_EXPERIMENTAL_PF_WEIGHTED_SHUFFLING", true);
1✔
81

82
  private final XdsLogger logger;
83
  private final Helper helper;
84
  private final LoadBalancerRegistry lbRegistry;
85
  private final ClusterState clusterState = new ClusterState();
1✔
86
  private GracefulSwitchLoadBalancer delegate;
87
  // Following fields are effectively final.
88
  private String clusterName;
89
  private Subscription clusterSubscription;
90

91
  CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
1✔
92
    this.helper = checkNotNull(helper, "helper");
1✔
93
    this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
1✔
94
    this.delegate = new GracefulSwitchLoadBalancer(helper);
1✔
95
    logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
1✔
96
    logger.log(XdsLogLevel.INFO, "Created");
1✔
97
  }
1✔
98

99
  @Override
100
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
101
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
102
    if (this.clusterName == null) {
1✔
103
      CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
104
      logger.log(XdsLogLevel.INFO, "Config: {0}", config);
1✔
105
      if (config.isDynamic) {
1✔
106
        clusterSubscription = resolvedAddresses.getAttributes()
1✔
107
            .get(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY)
1✔
108
            .subscribeToCluster(config.name);
1✔
109
      }
110
      this.clusterName = config.name;
1✔
111
    }
112
    XdsConfig xdsConfig = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CONFIG);
1✔
113
    StatusOr<XdsClusterConfig> clusterConfigOr = xdsConfig.getClusters().get(clusterName);
1✔
114
    if (clusterConfigOr == null) {
1✔
115
      if (clusterSubscription == null) {
1✔
116
        // Should be impossible, because XdsDependencyManager wouldn't have generated this
117
        return fail(Status.INTERNAL.withDescription(
×
118
            errorPrefix() + "Unable to find non-dynamic cluster"));
×
119
      }
120
      // The dynamic cluster must not have loaded yet
121
      return Status.OK;
1✔
122
    }
123
    if (!clusterConfigOr.hasValue()) {
1✔
124
      return fail(clusterConfigOr.getStatus());
1✔
125
    }
126
    XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
1✔
127

128
    NameResolver.ConfigOrError configOrError;
129
    if (clusterConfig.getChildren() instanceof EndpointConfig) {
1✔
130
      // The LB policy config is provided in service_config.proto/JSON format.
131
      configOrError =
1✔
132
              GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
1✔
133
                      Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()),
1✔
134
                      lbRegistry);
135
      if (configOrError.getError() != null) {
1✔
136
        // Should be impossible, because XdsClusterResource validated this
137
        return fail(Status.INTERNAL.withDescription(
×
138
                errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
×
139
      }
140

141
      StatusOr<EdsUpdate> edsUpdate = getEdsUpdate(xdsConfig, clusterName);
1✔
142
      StatusOr<ClusterResolutionResult> statusOrResult = clusterState.edsUpdateToResult(
1✔
143
          clusterName,
144
          clusterConfig.getClusterResource(),
1✔
145
          configOrError.getConfig(),
1✔
146
          edsUpdate);
147
      if (!statusOrResult.hasValue()) {
1✔
148
        Status status = Status.UNAVAILABLE
1✔
149
            .withDescription(statusOrResult.getStatus().getDescription())
1✔
150
            .withCause(statusOrResult.getStatus().getCause());
1✔
151
        delegate.handleNameResolutionError(status);
1✔
152
        return status;
1✔
153
      }
154
      ClusterResolutionResult result = statusOrResult.getValue();
1✔
155
      List<EquivalentAddressGroup> addresses = result.addresses;
1✔
156
      if (addresses.isEmpty()) {
1✔
157
        Status status = Status.UNAVAILABLE
1✔
158
            .withDescription("No usable endpoint from cluster: " + clusterName);
1✔
159
        delegate.handleNameResolutionError(status);
1✔
160
        return status;
1✔
161
      }
162
      Object gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
163
          lbRegistry.getProvider(PRIORITY_POLICY_NAME),
1✔
164
          new PriorityLbConfig(
165
              Collections.unmodifiableMap(result.priorityChildConfigs),
1✔
166
              Collections.unmodifiableList(result.priorities)));
1✔
167
      return delegate.acceptResolvedAddresses(
1✔
168
          resolvedAddresses.toBuilder()
1✔
169
            .setLoadBalancingPolicyConfig(gracefulConfig)
1✔
170
            .setAddresses(Collections.unmodifiableList(addresses))
1✔
171
            .build());
1✔
172
    } else if (clusterConfig.getChildren() instanceof AggregateConfig) {
1✔
173
      Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
1✔
174
      List<String> leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
1✔
175
      for (String childCluster: leafClusters) {
1✔
176
        priorityChildConfigs.put(childCluster,
1✔
177
                new PriorityChildConfig(
178
                        GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
179
                                lbRegistry.getProvider(CDS_POLICY_NAME),
1✔
180
                                new CdsConfig(childCluster)),
181
                        false));
182
      }
1✔
183
      Object gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
184
          lbRegistry.getProvider(PRIORITY_POLICY_NAME),
1✔
185
          new PriorityLoadBalancerProvider.PriorityLbConfig(
186
              Collections.unmodifiableMap(priorityChildConfigs), leafClusters));
1✔
187
      return delegate.acceptResolvedAddresses(
1✔
188
          resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build());
1✔
189
    } else {
190
      return fail(Status.INTERNAL.withDescription(
×
191
              errorPrefix() + "Unexpected cluster children type: "
×
192
                      + clusterConfig.getChildren().getClass()));
×
193
    }
194
  }
195

196
  @Override
197
  public void handleNameResolutionError(Status error) {
198
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
199
    if (delegate != null) {
1✔
200
      delegate.handleNameResolutionError(error);
1✔
201
    } else {
202
      helper.updateBalancingState(
×
203
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
×
204
    }
205
  }
1✔
206

207
  @Override
208
  public void shutdown() {
209
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
210
    delegate.shutdown();
1✔
211
    delegate = new GracefulSwitchLoadBalancer(helper);
1✔
212
    if (clusterSubscription != null) {
1✔
213
      clusterSubscription.close();
1✔
214
      clusterSubscription = null;
1✔
215
    }
216
  }
1✔
217

218
  @CheckReturnValue // don't forget to return up the stack after the fail call
219
  private Status fail(Status error) {
220
    delegate.shutdown();
1✔
221
    helper.updateBalancingState(
1✔
222
        TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
223
    return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter
1✔
224
  }
225

226
  private String errorPrefix() {
227
    return "CdsLb for " + clusterName + ": ";
×
228
  }
229

230
  /**
231
   * The number of bits assigned to the fractional part of fixed-point values. We normalize weights
232
   * to a fixed-point number between 0 and 1, representing that item's proportion of traffic (1 ==
233
   * 100% of traffic). We reserve at least one bit for the whole number so that we don't need to
234
   * special case a single item, and so that we can round up very low values without risking uint32
235
   * overflow of the sum of weights.
236
   */
237
  private static final int FIXED_POINT_FRACTIONAL_BITS = 31;
238

239
  /** Divide two uint32s and produce a fixed-point uint32 result. */
240
  private static long fractionToFixedPoint(long numerator, long denominator) {
241
    long one = 1L << FIXED_POINT_FRACTIONAL_BITS;
1✔
242
    return numerator * one / denominator;
1✔
243
  }
244

245
  /** Multiply two uint32 fixed-point numbers, returning a uint32 fixed-point. */
246
  private static long fixedPointMultiply(long a, long b) {
247
    return (a * b) >> FIXED_POINT_FRACTIONAL_BITS;
1✔
248
  }
249

250
  private static StatusOr<EdsUpdate> getEdsUpdate(XdsConfig xdsConfig, String cluster) {
251
    StatusOr<XdsClusterConfig> clusterConfig = xdsConfig.getClusters().get(cluster);
1✔
252
    if (clusterConfig == null) {
1✔
253
      return StatusOr.fromStatus(Status.INTERNAL
×
254
          .withDescription("BUG: cluster resolver could not find cluster in xdsConfig"));
×
255
    }
256
    if (!clusterConfig.hasValue()) {
1✔
257
      return StatusOr.fromStatus(clusterConfig.getStatus());
×
258
    }
259
    if (!(clusterConfig.getValue().getChildren() instanceof XdsClusterConfig.EndpointConfig)) {
1✔
260
      return StatusOr.fromStatus(Status.INTERNAL
×
261
          .withDescription("BUG: cluster resolver cluster with children of unknown type"));
×
262
    }
263
    XdsClusterConfig.EndpointConfig endpointConfig =
1✔
264
        (XdsClusterConfig.EndpointConfig) clusterConfig.getValue().getChildren();
1✔
265
    return endpointConfig.getEndpoint();
1✔
266
  }
267

268
  /**
269
   * Generates a string that represents the priority in the LB policy config. The string is unique
270
   * across priorities in all clusters and priorityName(c, p1) < priorityName(c, p2) iff p1 < p2.
271
   * The ordering is undefined for priorities in different clusters.
272
   */
273
  private static String priorityName(String cluster, int priority) {
274
    return cluster + "[child" + priority + "]";
1✔
275
  }
276

277
  /**
278
   * Generates a string that represents the locality in the LB policy config. The string is unique
279
   * across all localities in all clusters.
280
   */
281
  private static String localityName(Locality locality) {
282
    return "{region=\"" + locality.region()
1✔
283
        + "\", zone=\"" + locality.zone()
1✔
284
        + "\", sub_zone=\"" + locality.subZone()
1✔
285
        + "\"}";
286
  }
287

288
  private final class ClusterState {
1✔
289
    private Map<Locality, String> localityPriorityNames = Collections.emptyMap();
1✔
290
    int priorityNameGenId = 1;
1✔
291

292
    StatusOr<ClusterResolutionResult> edsUpdateToResult(
293
        String clusterName,
294
        CdsUpdate discovery,
295
        Object lbConfig,
296
        StatusOr<EdsUpdate> updateOr) {
297
      if (!updateOr.hasValue()) {
1✔
298
        return StatusOr.fromStatus(updateOr.getStatus());
1✔
299
      }
300
      EdsUpdate update = updateOr.getValue();
1✔
301
      logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update);
1✔
302
      if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
303
        logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories",
×
304
            clusterName, update.localityLbEndpointsMap.size(),
×
305
            update.dropPolicies.size());
×
306
      }
307
      Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
1✔
308
          update.localityLbEndpointsMap;
309
      List<DropOverload> dropOverloads = update.dropPolicies;
1✔
310
      List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
311
      Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
1✔
312
      List<String> sortedPriorityNames =
1✔
313
          generatePriorityNames(clusterName, localityLbEndpoints);
1✔
314
      Map<String, Long> priorityLocalityWeightSums;
315
      if (pickFirstWeightedShuffling) {
1✔
316
        priorityLocalityWeightSums = new HashMap<>(sortedPriorityNames.size() * 2);
1✔
317
        for (Locality locality : localityLbEndpoints.keySet()) {
1✔
318
          LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
1✔
319
          String priorityName = localityPriorityNames.get(locality);
1✔
320
          Long sum = priorityLocalityWeightSums.get(priorityName);
1✔
321
          if (sum == null) {
1✔
322
            sum = 0L;
1✔
323
          }
324
          long weight = UnsignedInts.toLong(localityLbInfo.localityWeight());
1✔
325
          priorityLocalityWeightSums.put(priorityName, sum + weight);
1✔
326
        }
1✔
327
      } else {
328
        priorityLocalityWeightSums = null;
1✔
329
      }
330

331
      for (Locality locality : localityLbEndpoints.keySet()) {
1✔
332
        LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
1✔
333
        String priorityName = localityPriorityNames.get(locality);
1✔
334
        boolean discard = true;
1✔
335
        // These sums _should_ fit in uint32, but XdsEndpointResource isn't actually verifying that
336
        // is true today. Since we are using long to avoid signedness trouble, the math happens to
337
        // still work if it turns out the sums exceed uint32.
338
        long localityWeightSum = 0;
1✔
339
        long endpointWeightSum = 0;
1✔
340
        if (pickFirstWeightedShuffling) {
1✔
341
          localityWeightSum = priorityLocalityWeightSums.get(priorityName);
1✔
342
          for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
1✔
343
            if (endpoint.isHealthy()) {
1✔
344
              endpointWeightSum += UnsignedInts.toLong(endpoint.loadBalancingWeight());
1✔
345
            }
346
          }
1✔
347
        }
348
        for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
1✔
349
          if (endpoint.isHealthy()) {
1✔
350
            discard = false;
1✔
351
            long weight;
352
            if (pickFirstWeightedShuffling) {
1✔
353
              // Combine locality and endpoint weights as defined by gRFC A113
354
              long localityWeight = fractionToFixedPoint(
1✔
355
                  UnsignedInts.toLong(localityLbInfo.localityWeight()), localityWeightSum);
1✔
356
              long endpointWeight = fractionToFixedPoint(
1✔
357
                  UnsignedInts.toLong(endpoint.loadBalancingWeight()), endpointWeightSum);
1✔
358
              weight = fixedPointMultiply(localityWeight, endpointWeight);
1✔
359
              if (weight == 0) {
1✔
360
                weight = 1;
×
361
              }
362
            } else {
1✔
363
              weight = localityLbInfo.localityWeight();
1✔
364
              if (endpoint.loadBalancingWeight() != 0) {
1✔
365
                weight *= endpoint.loadBalancingWeight();
1✔
366
              }
367
            }
368

369
            String localityName = localityName(locality);
1✔
370
            Attributes attr =
1✔
371
                endpoint.eag().getAttributes().toBuilder()
1✔
372
                    .set(io.grpc.xds.XdsAttributes.ATTR_LOCALITY, locality)
1✔
373
                    .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName)
1✔
374
                    .set(io.grpc.xds.XdsAttributes.ATTR_LOCALITY_WEIGHT,
1✔
375
                        localityLbInfo.localityWeight())
1✔
376
                    .set(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT, weight)
1✔
377
                    .set(XdsInternalAttributes.ATTR_ADDRESS_NAME, endpoint.hostname())
1✔
378
                    .build();
1✔
379
            EquivalentAddressGroup eag;
380
            if (discovery.isHttp11ProxyAvailable()) {
1✔
381
              List<SocketAddress> rewrittenAddresses = new ArrayList<>();
1✔
382
              for (SocketAddress addr : endpoint.eag().getAddresses()) {
1✔
383
                rewrittenAddresses.add(rewriteAddress(
1✔
384
                    addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata()));
1✔
385
              }
1✔
386
              eag = new EquivalentAddressGroup(rewrittenAddresses, attr);
1✔
387
            } else {
1✔
388
              eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
1✔
389
            }
390
            eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
1✔
391
            addresses.add(eag);
1✔
392
          }
393
        }
1✔
394
        if (discard) {
1✔
395
          logger.log(XdsLogLevel.INFO,
1✔
396
              "Discard locality {0} with 0 healthy endpoints", locality);
397
          continue;
1✔
398
        }
399
        if (!prioritizedLocalityWeights.containsKey(priorityName)) {
1✔
400
          prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
1✔
401
        }
402
        prioritizedLocalityWeights.get(priorityName).put(
1✔
403
            locality, localityLbInfo.localityWeight());
1✔
404
      }
1✔
405
      if (prioritizedLocalityWeights.isEmpty()) {
1✔
406
        // Will still update the result, as if the cluster resource is revoked.
407
        logger.log(XdsLogLevel.INFO,
1✔
408
            "Cluster {0} has no usable priority/locality/endpoint", clusterName);
409
      }
410
      sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet());
1✔
411
      Map<String, PriorityChildConfig> priorityChildConfigs =
1✔
412
          generatePriorityChildConfigs(
1✔
413
              clusterName, discovery, lbConfig, lbRegistry,
1✔
414
              prioritizedLocalityWeights, dropOverloads);
415
      return StatusOr.fromValue(new ClusterResolutionResult(addresses, priorityChildConfigs,
1✔
416
          sortedPriorityNames));
417
    }
418

419
    private SocketAddress rewriteAddress(SocketAddress addr,
420
        ImmutableMap<String, Object> endpointMetadata,
421
        ImmutableMap<String, Object> localityMetadata) {
422
      if (!(addr instanceof InetSocketAddress)) {
1✔
423
        return addr;
×
424
      }
425

426
      SocketAddress proxyAddress;
427
      try {
428
        proxyAddress = (SocketAddress) endpointMetadata.get(
1✔
429
            "envoy.http11_proxy_transport_socket.proxy_address");
430
        if (proxyAddress == null) {
1✔
431
          proxyAddress = (SocketAddress) localityMetadata.get(
1✔
432
              "envoy.http11_proxy_transport_socket.proxy_address");
433
        }
434
      } catch (ClassCastException e) {
×
435
        return addr;
×
436
      }
1✔
437

438
      if (proxyAddress == null) {
1✔
439
        return addr;
1✔
440
      }
441

442
      return HttpConnectProxiedSocketAddress.newBuilder()
1✔
443
          .setTargetAddress((InetSocketAddress) addr)
1✔
444
          .setProxyAddress(proxyAddress)
1✔
445
          .build();
1✔
446
    }
447

448
    private List<String> generatePriorityNames(String name,
449
        Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
450
      TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
1✔
451
      for (Locality locality : localityLbEndpoints.keySet()) {
1✔
452
        int priority = localityLbEndpoints.get(locality).priority();
1✔
453
        if (!todo.containsKey(priority)) {
1✔
454
          todo.put(priority, new ArrayList<>());
1✔
455
        }
456
        todo.get(priority).add(locality);
1✔
457
      }
1✔
458
      Map<Locality, String> newNames = new HashMap<>();
1✔
459
      Set<String> usedNames = new HashSet<>();
1✔
460
      List<String> ret = new ArrayList<>();
1✔
461
      for (Integer priority: todo.keySet()) {
1✔
462
        String foundName = "";
1✔
463
        for (Locality locality : todo.get(priority)) {
1✔
464
          if (localityPriorityNames.containsKey(locality)
1✔
465
              && usedNames.add(localityPriorityNames.get(locality))) {
1✔
466
            foundName = localityPriorityNames.get(locality);
1✔
467
            break;
1✔
468
          }
469
        }
1✔
470
        if ("".equals(foundName)) {
1✔
471
          foundName = priorityName(name, priorityNameGenId++);
1✔
472
        }
473
        for (Locality locality : todo.get(priority)) {
1✔
474
          newNames.put(locality, foundName);
1✔
475
        }
1✔
476
        ret.add(foundName);
1✔
477
      }
1✔
478
      localityPriorityNames = newNames;
1✔
479
      return ret;
1✔
480
    }
481
  }
482

483
  private static class ClusterResolutionResult {
484
    // Endpoint addresses.
485
    private final List<EquivalentAddressGroup> addresses;
486
    // Config (include load balancing policy/config) for each priority in the cluster.
487
    private final Map<String, PriorityChildConfig> priorityChildConfigs;
488
    // List of priority names ordered in descending priorities.
489
    private final List<String> priorities;
490

491
    ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
492
        Map<String, PriorityChildConfig> configs, List<String> priorities) {
1✔
493
      this.addresses = addresses;
1✔
494
      this.priorityChildConfigs = configs;
1✔
495
      this.priorities = priorities;
1✔
496
    }
1✔
497
  }
498

499
  /**
500
   * Generates configs to be used in the priority LB policy for priorities in a cluster.
501
   *
502
   * <p>priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB
503
   * -> round_robin / least_request_experimental (one per locality)) / ring_hash_experimental
504
   */
505
  private static Map<String, PriorityChildConfig> generatePriorityChildConfigs(
506
      String clusterName,
507
      CdsUpdate discovery,
508
      Object endpointLbConfig,
509
      LoadBalancerRegistry lbRegistry,
510
      Map<String, Map<Locality, Integer>> prioritizedLocalityWeights,
511
      List<DropOverload> dropOverloads) {
512
    Map<String, PriorityChildConfig> configs = new HashMap<>();
1✔
513
    for (String priority : prioritizedLocalityWeights.keySet()) {
1✔
514
      ClusterImplConfig clusterImplConfig =
1✔
515
          new ClusterImplConfig(
516
              clusterName, discovery.edsServiceName(), discovery.lrsServerInfo(),
1✔
517
              discovery.maxConcurrentRequests(), dropOverloads, endpointLbConfig,
1✔
518
              discovery.upstreamTlsContext(), discovery.filterMetadata(),
1✔
519
              discovery.backendMetricPropagation());
1✔
520
      LoadBalancerProvider clusterImplLbProvider =
1✔
521
          lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
1✔
522
      Object priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
523
          clusterImplLbProvider, clusterImplConfig);
524

525
      // If outlier detection has been configured we wrap the child policy in the outlier detection
526
      // load balancer.
527
      if (discovery.outlierDetection() != null) {
1✔
528
        LoadBalancerProvider outlierDetectionProvider = lbRegistry.getProvider(
1✔
529
            "outlier_detection_experimental");
530
        priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
531
            outlierDetectionProvider,
532
            buildOutlierDetectionLbConfig(discovery.outlierDetection(), priorityChildPolicy));
1✔
533
      }
534

535
      boolean isEds = discovery.clusterType() == ClusterType.EDS;
1✔
536
      PriorityChildConfig priorityChildConfig =
1✔
537
          new PriorityChildConfig(priorityChildPolicy, isEds /* ignoreReresolution */);
538
      configs.put(priority, priorityChildConfig);
1✔
539
    }
1✔
540
    return configs;
1✔
541
  }
542

543
  /**
544
   * Converts {@link OutlierDetection} that represents the xDS configuration to {@link
545
   * OutlierDetectionLoadBalancerConfig} that the {@link io.grpc.util.OutlierDetectionLoadBalancer}
546
   * understands.
547
   */
548
  private static OutlierDetectionLoadBalancerConfig buildOutlierDetectionLbConfig(
549
      OutlierDetection outlierDetection, Object childConfig) {
550
    OutlierDetectionLoadBalancerConfig.Builder configBuilder
1✔
551
        = new OutlierDetectionLoadBalancerConfig.Builder();
552

553
    configBuilder.setChildConfig(childConfig);
1✔
554

555
    if (outlierDetection.intervalNanos() != null) {
1✔
556
      configBuilder.setIntervalNanos(outlierDetection.intervalNanos());
1✔
557
    }
558
    if (outlierDetection.baseEjectionTimeNanos() != null) {
1✔
559
      configBuilder.setBaseEjectionTimeNanos(outlierDetection.baseEjectionTimeNanos());
1✔
560
    }
561
    if (outlierDetection.maxEjectionTimeNanos() != null) {
1✔
562
      configBuilder.setMaxEjectionTimeNanos(outlierDetection.maxEjectionTimeNanos());
1✔
563
    }
564
    if (outlierDetection.maxEjectionPercent() != null) {
1✔
565
      configBuilder.setMaxEjectionPercent(outlierDetection.maxEjectionPercent());
1✔
566
    }
567

568
    SuccessRateEjection successRate = outlierDetection.successRateEjection();
1✔
569
    if (successRate != null) {
1✔
570
      OutlierDetectionLoadBalancerConfig.SuccessRateEjection.Builder
571
          successRateConfigBuilder = new OutlierDetectionLoadBalancerConfig
1✔
572
          .SuccessRateEjection.Builder();
573

574
      if (successRate.stdevFactor() != null) {
1✔
575
        successRateConfigBuilder.setStdevFactor(successRate.stdevFactor());
1✔
576
      }
577
      if (successRate.enforcementPercentage() != null) {
1✔
578
        successRateConfigBuilder.setEnforcementPercentage(successRate.enforcementPercentage());
1✔
579
      }
580
      if (successRate.minimumHosts() != null) {
1✔
581
        successRateConfigBuilder.setMinimumHosts(successRate.minimumHosts());
1✔
582
      }
583
      if (successRate.requestVolume() != null) {
1✔
584
        successRateConfigBuilder.setRequestVolume(successRate.requestVolume());
1✔
585
      }
586

587
      configBuilder.setSuccessRateEjection(successRateConfigBuilder.build());
1✔
588
    }
589

590
    FailurePercentageEjection failurePercentage = outlierDetection.failurePercentageEjection();
1✔
591
    if (failurePercentage != null) {
1✔
592
      OutlierDetectionLoadBalancerConfig.FailurePercentageEjection.Builder
593
          failurePercentageConfigBuilder = new OutlierDetectionLoadBalancerConfig
1✔
594
          .FailurePercentageEjection.Builder();
595

596
      if (failurePercentage.threshold() != null) {
1✔
597
        failurePercentageConfigBuilder.setThreshold(failurePercentage.threshold());
1✔
598
      }
599
      if (failurePercentage.enforcementPercentage() != null) {
1✔
600
        failurePercentageConfigBuilder.setEnforcementPercentage(
1✔
601
            failurePercentage.enforcementPercentage());
1✔
602
      }
603
      if (failurePercentage.minimumHosts() != null) {
1✔
604
        failurePercentageConfigBuilder.setMinimumHosts(failurePercentage.minimumHosts());
1✔
605
      }
606
      if (failurePercentage.requestVolume() != null) {
1✔
607
        failurePercentageConfigBuilder.setRequestVolume(failurePercentage.requestVolume());
1✔
608
      }
609

610
      configBuilder.setFailurePercentageEjection(failurePercentageConfigBuilder.build());
1✔
611
    }
612

613
    return configBuilder.build();
1✔
614
  }
615
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc