• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20239

14 Apr 2026 05:21PM UTC coverage: 88.809% (-0.003%) from 88.812%
#20239

push

github

web-flow
core,xds: Fix backend_service plumbing for subchannel metrics (#12735)

This PR fixes #12432.

Subchannel metrics read backend_service from EAG attributes, but xDS
currently only populates the resolution result attribute. As a result,
grpc.lb.backend_service is left unset for subchannel metrics in the cds
path.

This change adds an internal EAG-level backend_service attribute in cds
and has InternalSubchannel read that attribute for subchannel metrics,
while keeping a fallback to the existing resolution result attribute.

This PR is intentionally scoped to subchannel metrics only and does not
attempt the broader #12431 plumbing changes.

36022 of 40561 relevant lines covered (88.81%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.9
/../xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21
import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME;
22
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
23

24
import com.google.common.collect.ImmutableMap;
25
import com.google.common.primitives.UnsignedInts;
26
import com.google.errorprone.annotations.CheckReturnValue;
27
import io.grpc.Attributes;
28
import io.grpc.EquivalentAddressGroup;
29
import io.grpc.HttpConnectProxiedSocketAddress;
30
import io.grpc.InternalEquivalentAddressGroup;
31
import io.grpc.InternalLogId;
32
import io.grpc.LoadBalancer;
33
import io.grpc.LoadBalancerProvider;
34
import io.grpc.LoadBalancerRegistry;
35
import io.grpc.NameResolver;
36
import io.grpc.Status;
37
import io.grpc.StatusOr;
38
import io.grpc.internal.GrpcUtil;
39
import io.grpc.util.GracefulSwitchLoadBalancer;
40
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
41
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
42
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
43
import io.grpc.xds.Endpoints.DropOverload;
44
import io.grpc.xds.Endpoints.LbEndpoint;
45
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
46
import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection;
47
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
48
import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection;
49
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
50
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
51
import io.grpc.xds.XdsClusterResource.CdsUpdate;
52
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
53
import io.grpc.xds.XdsConfig.Subscription;
54
import io.grpc.xds.XdsConfig.XdsClusterConfig;
55
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
56
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
57
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
58
import io.grpc.xds.client.Locality;
59
import io.grpc.xds.client.XdsLogger;
60
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
61
import io.grpc.xds.internal.XdsInternalAttributes;
62
import java.net.InetSocketAddress;
63
import java.net.SocketAddress;
64
import java.util.ArrayList;
65
import java.util.Arrays;
66
import java.util.Collections;
67
import java.util.HashMap;
68
import java.util.HashSet;
69
import java.util.List;
70
import java.util.Map;
71
import java.util.Set;
72
import java.util.TreeMap;
73

74
/**
75
 * Load balancer for cds_experimental LB policy. One instance per top-level cluster.
76
 * The top-level cluster may be a plain EDS/logical-DNS cluster or an aggregate cluster formed
77
 * by a group of sub-clusters in a tree hierarchy.
78
 */
79
final class CdsLoadBalancer2 extends LoadBalancer {
80
  static boolean pickFirstWeightedShuffling =
1✔
81
      GrpcUtil.getFlag("GRPC_EXPERIMENTAL_PF_WEIGHTED_SHUFFLING", true);
1✔
82

83
  private final XdsLogger logger;
84
  private final Helper helper;
85
  private final LoadBalancerRegistry lbRegistry;
86
  private final ClusterState clusterState = new ClusterState();
1✔
87
  private GracefulSwitchLoadBalancer delegate;
88
  // Following fields are effectively final.
89
  private String clusterName;
90
  private Subscription clusterSubscription;
91

92
  CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
1✔
93
    this.helper = checkNotNull(helper, "helper");
1✔
94
    this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
1✔
95
    this.delegate = new GracefulSwitchLoadBalancer(helper);
1✔
96
    logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
1✔
97
    logger.log(XdsLogLevel.INFO, "Created");
1✔
98
  }
1✔
99

100
  @Override
101
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
102
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
103
    if (this.clusterName == null) {
1✔
104
      CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
105
      logger.log(XdsLogLevel.INFO, "Config: {0}", config);
1✔
106
      if (config.isDynamic) {
1✔
107
        clusterSubscription = resolvedAddresses.getAttributes()
1✔
108
            .get(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY)
1✔
109
            .subscribeToCluster(config.name);
1✔
110
      }
111
      this.clusterName = config.name;
1✔
112
    }
113
    XdsConfig xdsConfig = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CONFIG);
1✔
114
    StatusOr<XdsClusterConfig> clusterConfigOr = xdsConfig.getClusters().get(clusterName);
1✔
115
    if (clusterConfigOr == null) {
1✔
116
      if (clusterSubscription == null) {
1✔
117
        // Should be impossible, because XdsDependencyManager wouldn't have generated this
118
        return fail(Status.INTERNAL.withDescription(
×
119
            errorPrefix() + "Unable to find non-dynamic cluster"));
×
120
      }
121
      // The dynamic cluster must not have loaded yet
122
      return Status.OK;
1✔
123
    }
124
    if (!clusterConfigOr.hasValue()) {
1✔
125
      return fail(clusterConfigOr.getStatus());
1✔
126
    }
127
    XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
1✔
128

129
    NameResolver.ConfigOrError configOrError;
130
    if (clusterConfig.getChildren() instanceof EndpointConfig) {
1✔
131
      // The LB policy config is provided in service_config.proto/JSON format.
132
      configOrError =
1✔
133
              GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
1✔
134
                      Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()),
1✔
135
                      lbRegistry);
136
      if (configOrError.getError() != null) {
1✔
137
        // Should be impossible, because XdsClusterResource validated this
138
        return fail(Status.INTERNAL.withDescription(
×
139
                errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
×
140
      }
141

142
      StatusOr<EdsUpdate> edsUpdate = getEdsUpdate(xdsConfig, clusterName);
1✔
143
      StatusOr<ClusterResolutionResult> statusOrResult = clusterState.edsUpdateToResult(
1✔
144
          clusterName,
145
          clusterConfig.getClusterResource(),
1✔
146
          configOrError.getConfig(),
1✔
147
          edsUpdate);
148
      if (!statusOrResult.hasValue()) {
1✔
149
        Status status = Status.UNAVAILABLE
1✔
150
            .withDescription(statusOrResult.getStatus().getDescription())
1✔
151
            .withCause(statusOrResult.getStatus().getCause());
1✔
152
        delegate.handleNameResolutionError(status);
1✔
153
        return status;
1✔
154
      }
155
      ClusterResolutionResult result = statusOrResult.getValue();
1✔
156
      List<EquivalentAddressGroup> addresses = result.addresses;
1✔
157
      if (addresses.isEmpty()) {
1✔
158
        Status status = Status.UNAVAILABLE
1✔
159
            .withDescription("No usable endpoint from cluster: " + clusterName);
1✔
160
        delegate.handleNameResolutionError(status);
1✔
161
        return status;
1✔
162
      }
163
      Object gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
164
          lbRegistry.getProvider(PRIORITY_POLICY_NAME),
1✔
165
          new PriorityLbConfig(
166
              Collections.unmodifiableMap(result.priorityChildConfigs),
1✔
167
              Collections.unmodifiableList(result.priorities)));
1✔
168
      return delegate.acceptResolvedAddresses(
1✔
169
          resolvedAddresses.toBuilder()
1✔
170
            .setLoadBalancingPolicyConfig(gracefulConfig)
1✔
171
            .setAddresses(Collections.unmodifiableList(addresses))
1✔
172
            .build());
1✔
173
    } else if (clusterConfig.getChildren() instanceof AggregateConfig) {
1✔
174
      Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
1✔
175
      List<String> leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
1✔
176
      for (String childCluster: leafClusters) {
1✔
177
        priorityChildConfigs.put(childCluster,
1✔
178
                new PriorityChildConfig(
179
                        GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
180
                                lbRegistry.getProvider(CDS_POLICY_NAME),
1✔
181
                                new CdsConfig(childCluster)),
182
                        false));
183
      }
1✔
184
      Object gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
185
          lbRegistry.getProvider(PRIORITY_POLICY_NAME),
1✔
186
          new PriorityLoadBalancerProvider.PriorityLbConfig(
187
              Collections.unmodifiableMap(priorityChildConfigs), leafClusters));
1✔
188
      return delegate.acceptResolvedAddresses(
1✔
189
          resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build());
1✔
190
    } else {
191
      return fail(Status.INTERNAL.withDescription(
×
192
              errorPrefix() + "Unexpected cluster children type: "
×
193
                      + clusterConfig.getChildren().getClass()));
×
194
    }
195
  }
196

197
  @Override
198
  public void handleNameResolutionError(Status error) {
199
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
200
    if (delegate != null) {
1✔
201
      delegate.handleNameResolutionError(error);
1✔
202
    } else {
203
      helper.updateBalancingState(
×
204
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
×
205
    }
206
  }
1✔
207

208
  @Override
209
  public void shutdown() {
210
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
211
    delegate.shutdown();
1✔
212
    delegate = new GracefulSwitchLoadBalancer(helper);
1✔
213
    if (clusterSubscription != null) {
1✔
214
      clusterSubscription.close();
1✔
215
      clusterSubscription = null;
1✔
216
    }
217
  }
1✔
218

219
  @CheckReturnValue // don't forget to return up the stack after the fail call
220
  private Status fail(Status error) {
221
    delegate.shutdown();
1✔
222
    helper.updateBalancingState(
1✔
223
        TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
224
    return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter
1✔
225
  }
226

227
  private String errorPrefix() {
228
    return "CdsLb for " + clusterName + ": ";
×
229
  }
230

231
  /**
232
   * The number of bits assigned to the fractional part of fixed-point values. We normalize weights
233
   * to a fixed-point number between 0 and 1, representing that item's proportion of traffic (1 ==
234
   * 100% of traffic). We reserve at least one bit for the whole number so that we don't need to
235
   * special case a single item, and so that we can round up very low values without risking uint32
236
   * overflow of the sum of weights.
237
   */
238
  private static final int FIXED_POINT_FRACTIONAL_BITS = 31;
239

240
  /** Divide two uint32s and produce a fixed-point uint32 result. */
241
  private static long fractionToFixedPoint(long numerator, long denominator) {
242
    long one = 1L << FIXED_POINT_FRACTIONAL_BITS;
1✔
243
    return numerator * one / denominator;
1✔
244
  }
245

246
  /** Multiply two uint32 fixed-point numbers, returning a uint32 fixed-point. */
247
  private static long fixedPointMultiply(long a, long b) {
248
    return (a * b) >> FIXED_POINT_FRACTIONAL_BITS;
1✔
249
  }
250

251
  private static StatusOr<EdsUpdate> getEdsUpdate(XdsConfig xdsConfig, String cluster) {
252
    StatusOr<XdsClusterConfig> clusterConfig = xdsConfig.getClusters().get(cluster);
1✔
253
    if (clusterConfig == null) {
1✔
254
      return StatusOr.fromStatus(Status.INTERNAL
×
255
          .withDescription("BUG: cluster resolver could not find cluster in xdsConfig"));
×
256
    }
257
    if (!clusterConfig.hasValue()) {
1✔
258
      return StatusOr.fromStatus(clusterConfig.getStatus());
×
259
    }
260
    if (!(clusterConfig.getValue().getChildren() instanceof XdsClusterConfig.EndpointConfig)) {
1✔
261
      return StatusOr.fromStatus(Status.INTERNAL
×
262
          .withDescription("BUG: cluster resolver cluster with children of unknown type"));
×
263
    }
264
    XdsClusterConfig.EndpointConfig endpointConfig =
1✔
265
        (XdsClusterConfig.EndpointConfig) clusterConfig.getValue().getChildren();
1✔
266
    return endpointConfig.getEndpoint();
1✔
267
  }
268

269
  /**
270
   * Generates a string that represents the priority in the LB policy config. The string is unique
271
   * across priorities in all clusters and priorityName(c, p1) < priorityName(c, p2) iff p1 < p2.
272
   * The ordering is undefined for priorities in different clusters.
273
   */
274
  private static String priorityName(String cluster, int priority) {
275
    return cluster + "[child" + priority + "]";
1✔
276
  }
277

278
  /**
279
   * Generates a string that represents the locality in the LB policy config. The string is unique
280
   * across all localities in all clusters.
281
   */
282
  private static String localityName(Locality locality) {
283
    return "{region=\"" + locality.region()
1✔
284
        + "\", zone=\"" + locality.zone()
1✔
285
        + "\", sub_zone=\"" + locality.subZone()
1✔
286
        + "\"}";
287
  }
288

289
  private final class ClusterState {
1✔
290
    private Map<Locality, String> localityPriorityNames = Collections.emptyMap();
1✔
291
    int priorityNameGenId = 1;
1✔
292

293
    StatusOr<ClusterResolutionResult> edsUpdateToResult(
294
        String clusterName,
295
        CdsUpdate discovery,
296
        Object lbConfig,
297
        StatusOr<EdsUpdate> updateOr) {
298
      if (!updateOr.hasValue()) {
1✔
299
        return StatusOr.fromStatus(updateOr.getStatus());
1✔
300
      }
301
      EdsUpdate update = updateOr.getValue();
1✔
302
      logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update);
1✔
303
      if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
304
        logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories",
×
305
            clusterName, update.localityLbEndpointsMap.size(),
×
306
            update.dropPolicies.size());
×
307
      }
308
      Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
1✔
309
          update.localityLbEndpointsMap;
310
      List<DropOverload> dropOverloads = update.dropPolicies;
1✔
311
      List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
312
      Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
1✔
313
      List<String> sortedPriorityNames =
1✔
314
          generatePriorityNames(clusterName, localityLbEndpoints);
1✔
315
      Map<String, Long> priorityLocalityWeightSums;
316
      if (pickFirstWeightedShuffling) {
1✔
317
        priorityLocalityWeightSums = new HashMap<>(sortedPriorityNames.size() * 2);
1✔
318
        for (Locality locality : localityLbEndpoints.keySet()) {
1✔
319
          LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
1✔
320
          String priorityName = localityPriorityNames.get(locality);
1✔
321
          Long sum = priorityLocalityWeightSums.get(priorityName);
1✔
322
          if (sum == null) {
1✔
323
            sum = 0L;
1✔
324
          }
325
          long weight = UnsignedInts.toLong(localityLbInfo.localityWeight());
1✔
326
          priorityLocalityWeightSums.put(priorityName, sum + weight);
1✔
327
        }
1✔
328
      } else {
329
        priorityLocalityWeightSums = null;
1✔
330
      }
331

332
      for (Locality locality : localityLbEndpoints.keySet()) {
1✔
333
        LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
1✔
334
        String priorityName = localityPriorityNames.get(locality);
1✔
335
        boolean discard = true;
1✔
336
        // These sums _should_ fit in uint32, but XdsEndpointResource isn't actually verifying that
337
        // is true today. Since we are using long to avoid signedness trouble, the math happens to
338
        // still work if it turns out the sums exceed uint32.
339
        long localityWeightSum = 0;
1✔
340
        long endpointWeightSum = 0;
1✔
341
        if (pickFirstWeightedShuffling) {
1✔
342
          localityWeightSum = priorityLocalityWeightSums.get(priorityName);
1✔
343
          for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
1✔
344
            if (endpoint.isHealthy()) {
1✔
345
              endpointWeightSum += UnsignedInts.toLong(endpoint.loadBalancingWeight());
1✔
346
            }
347
          }
1✔
348
        }
349
        for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
1✔
350
          if (endpoint.isHealthy()) {
1✔
351
            discard = false;
1✔
352
            long weight;
353
            if (pickFirstWeightedShuffling) {
1✔
354
              // Combine locality and endpoint weights as defined by gRFC A113
355
              long localityWeight = fractionToFixedPoint(
1✔
356
                  UnsignedInts.toLong(localityLbInfo.localityWeight()), localityWeightSum);
1✔
357
              long endpointWeight = fractionToFixedPoint(
1✔
358
                  UnsignedInts.toLong(endpoint.loadBalancingWeight()), endpointWeightSum);
1✔
359
              weight = fixedPointMultiply(localityWeight, endpointWeight);
1✔
360
              if (weight == 0) {
1✔
361
                weight = 1;
×
362
              }
363
            } else {
1✔
364
              weight = localityLbInfo.localityWeight();
1✔
365
              if (endpoint.loadBalancingWeight() != 0) {
1✔
366
                weight *= endpoint.loadBalancingWeight();
1✔
367
              }
368
            }
369

370
            String localityName = localityName(locality);
1✔
371
            Attributes attr =
1✔
372
                endpoint.eag().getAttributes().toBuilder()
1✔
373
                    .set(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE, clusterName)
1✔
374
                    .set(io.grpc.xds.XdsAttributes.ATTR_LOCALITY, locality)
1✔
375
                    .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName)
1✔
376
                    .set(io.grpc.xds.XdsAttributes.ATTR_LOCALITY_WEIGHT,
1✔
377
                        localityLbInfo.localityWeight())
1✔
378
                    .set(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT, weight)
1✔
379
                    .set(XdsInternalAttributes.ATTR_ADDRESS_NAME, endpoint.hostname())
1✔
380
                    .build();
1✔
381
            EquivalentAddressGroup eag;
382
            if (discovery.isHttp11ProxyAvailable()) {
1✔
383
              List<SocketAddress> rewrittenAddresses = new ArrayList<>();
1✔
384
              for (SocketAddress addr : endpoint.eag().getAddresses()) {
1✔
385
                rewrittenAddresses.add(rewriteAddress(
1✔
386
                    addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata()));
1✔
387
              }
1✔
388
              eag = new EquivalentAddressGroup(rewrittenAddresses, attr);
1✔
389
            } else {
1✔
390
              eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
1✔
391
            }
392
            eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
1✔
393
            addresses.add(eag);
1✔
394
          }
395
        }
1✔
396
        if (discard) {
1✔
397
          logger.log(XdsLogLevel.INFO,
1✔
398
              "Discard locality {0} with 0 healthy endpoints", locality);
399
          continue;
1✔
400
        }
401
        if (!prioritizedLocalityWeights.containsKey(priorityName)) {
1✔
402
          prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
1✔
403
        }
404
        prioritizedLocalityWeights.get(priorityName).put(
1✔
405
            locality, localityLbInfo.localityWeight());
1✔
406
      }
1✔
407
      if (prioritizedLocalityWeights.isEmpty()) {
1✔
408
        // Will still update the result, as if the cluster resource is revoked.
409
        logger.log(XdsLogLevel.INFO,
1✔
410
            "Cluster {0} has no usable priority/locality/endpoint", clusterName);
411
      }
412
      sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet());
1✔
413
      Map<String, PriorityChildConfig> priorityChildConfigs =
1✔
414
          generatePriorityChildConfigs(
1✔
415
              clusterName, discovery, lbConfig, lbRegistry,
1✔
416
              prioritizedLocalityWeights, dropOverloads);
417
      return StatusOr.fromValue(new ClusterResolutionResult(addresses, priorityChildConfigs,
1✔
418
          sortedPriorityNames));
419
    }
420

421
    private SocketAddress rewriteAddress(SocketAddress addr,
422
        ImmutableMap<String, Object> endpointMetadata,
423
        ImmutableMap<String, Object> localityMetadata) {
424
      if (!(addr instanceof InetSocketAddress)) {
1✔
425
        return addr;
×
426
      }
427

428
      SocketAddress proxyAddress;
429
      try {
430
        proxyAddress = (SocketAddress) endpointMetadata.get(
1✔
431
            "envoy.http11_proxy_transport_socket.proxy_address");
432
        if (proxyAddress == null) {
1✔
433
          proxyAddress = (SocketAddress) localityMetadata.get(
1✔
434
              "envoy.http11_proxy_transport_socket.proxy_address");
435
        }
436
      } catch (ClassCastException e) {
×
437
        return addr;
×
438
      }
1✔
439

440
      if (proxyAddress == null) {
1✔
441
        return addr;
1✔
442
      }
443

444
      return HttpConnectProxiedSocketAddress.newBuilder()
1✔
445
          .setTargetAddress((InetSocketAddress) addr)
1✔
446
          .setProxyAddress(proxyAddress)
1✔
447
          .build();
1✔
448
    }
449

450
    private List<String> generatePriorityNames(String name,
451
        Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
452
      TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
1✔
453
      for (Locality locality : localityLbEndpoints.keySet()) {
1✔
454
        int priority = localityLbEndpoints.get(locality).priority();
1✔
455
        if (!todo.containsKey(priority)) {
1✔
456
          todo.put(priority, new ArrayList<>());
1✔
457
        }
458
        todo.get(priority).add(locality);
1✔
459
      }
1✔
460
      Map<Locality, String> newNames = new HashMap<>();
1✔
461
      Set<String> usedNames = new HashSet<>();
1✔
462
      List<String> ret = new ArrayList<>();
1✔
463
      for (Integer priority: todo.keySet()) {
1✔
464
        String foundName = "";
1✔
465
        for (Locality locality : todo.get(priority)) {
1✔
466
          if (localityPriorityNames.containsKey(locality)
1✔
467
              && usedNames.add(localityPriorityNames.get(locality))) {
1✔
468
            foundName = localityPriorityNames.get(locality);
1✔
469
            break;
1✔
470
          }
471
        }
1✔
472
        if ("".equals(foundName)) {
1✔
473
          foundName = priorityName(name, priorityNameGenId++);
1✔
474
        }
475
        for (Locality locality : todo.get(priority)) {
1✔
476
          newNames.put(locality, foundName);
1✔
477
        }
1✔
478
        ret.add(foundName);
1✔
479
      }
1✔
480
      localityPriorityNames = newNames;
1✔
481
      return ret;
1✔
482
    }
483
  }
484

485
  private static class ClusterResolutionResult {
486
    // Endpoint addresses.
487
    private final List<EquivalentAddressGroup> addresses;
488
    // Config (include load balancing policy/config) for each priority in the cluster.
489
    private final Map<String, PriorityChildConfig> priorityChildConfigs;
490
    // List of priority names ordered in descending priorities.
491
    private final List<String> priorities;
492

493
    ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
494
        Map<String, PriorityChildConfig> configs, List<String> priorities) {
1✔
495
      this.addresses = addresses;
1✔
496
      this.priorityChildConfigs = configs;
1✔
497
      this.priorities = priorities;
1✔
498
    }
1✔
499
  }
500

501
  /**
502
   * Generates configs to be used in the priority LB policy for priorities in a cluster.
503
   *
504
   * <p>priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB
505
   * -> round_robin / least_request_experimental (one per locality)) / ring_hash_experimental
506
   */
507
  private static Map<String, PriorityChildConfig> generatePriorityChildConfigs(
508
      String clusterName,
509
      CdsUpdate discovery,
510
      Object endpointLbConfig,
511
      LoadBalancerRegistry lbRegistry,
512
      Map<String, Map<Locality, Integer>> prioritizedLocalityWeights,
513
      List<DropOverload> dropOverloads) {
514
    Map<String, PriorityChildConfig> configs = new HashMap<>();
1✔
515
    for (String priority : prioritizedLocalityWeights.keySet()) {
1✔
516
      ClusterImplConfig clusterImplConfig =
1✔
517
          new ClusterImplConfig(
518
              clusterName, discovery.edsServiceName(), discovery.lrsServerInfo(),
1✔
519
              discovery.maxConcurrentRequests(), dropOverloads, endpointLbConfig,
1✔
520
              discovery.upstreamTlsContext(), discovery.filterMetadata(),
1✔
521
              discovery.backendMetricPropagation());
1✔
522
      LoadBalancerProvider clusterImplLbProvider =
1✔
523
          lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
1✔
524
      Object priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
525
          clusterImplLbProvider, clusterImplConfig);
526

527
      // If outlier detection has been configured we wrap the child policy in the outlier detection
528
      // load balancer.
529
      if (discovery.outlierDetection() != null) {
1✔
530
        LoadBalancerProvider outlierDetectionProvider = lbRegistry.getProvider(
1✔
531
            "outlier_detection_experimental");
532
        priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
533
            outlierDetectionProvider,
534
            buildOutlierDetectionLbConfig(discovery.outlierDetection(), priorityChildPolicy));
1✔
535
      }
536

537
      boolean isEds = discovery.clusterType() == ClusterType.EDS;
1✔
538
      PriorityChildConfig priorityChildConfig =
1✔
539
          new PriorityChildConfig(priorityChildPolicy, isEds /* ignoreReresolution */);
540
      configs.put(priority, priorityChildConfig);
1✔
541
    }
1✔
542
    return configs;
1✔
543
  }
544

545
  /**
546
   * Converts {@link OutlierDetection} that represents the xDS configuration to {@link
547
   * OutlierDetectionLoadBalancerConfig} that the {@link io.grpc.util.OutlierDetectionLoadBalancer}
548
   * understands.
549
   */
550
  private static OutlierDetectionLoadBalancerConfig buildOutlierDetectionLbConfig(
551
      OutlierDetection outlierDetection, Object childConfig) {
552
    OutlierDetectionLoadBalancerConfig.Builder configBuilder
1✔
553
        = new OutlierDetectionLoadBalancerConfig.Builder();
554

555
    configBuilder.setChildConfig(childConfig);
1✔
556

557
    if (outlierDetection.intervalNanos() != null) {
1✔
558
      configBuilder.setIntervalNanos(outlierDetection.intervalNanos());
1✔
559
    }
560
    if (outlierDetection.baseEjectionTimeNanos() != null) {
1✔
561
      configBuilder.setBaseEjectionTimeNanos(outlierDetection.baseEjectionTimeNanos());
1✔
562
    }
563
    if (outlierDetection.maxEjectionTimeNanos() != null) {
1✔
564
      configBuilder.setMaxEjectionTimeNanos(outlierDetection.maxEjectionTimeNanos());
1✔
565
    }
566
    if (outlierDetection.maxEjectionPercent() != null) {
1✔
567
      configBuilder.setMaxEjectionPercent(outlierDetection.maxEjectionPercent());
1✔
568
    }
569

570
    SuccessRateEjection successRate = outlierDetection.successRateEjection();
1✔
571
    if (successRate != null) {
1✔
572
      OutlierDetectionLoadBalancerConfig.SuccessRateEjection.Builder
573
          successRateConfigBuilder = new OutlierDetectionLoadBalancerConfig
1✔
574
          .SuccessRateEjection.Builder();
575

576
      if (successRate.stdevFactor() != null) {
1✔
577
        successRateConfigBuilder.setStdevFactor(successRate.stdevFactor());
1✔
578
      }
579
      if (successRate.enforcementPercentage() != null) {
1✔
580
        successRateConfigBuilder.setEnforcementPercentage(successRate.enforcementPercentage());
1✔
581
      }
582
      if (successRate.minimumHosts() != null) {
1✔
583
        successRateConfigBuilder.setMinimumHosts(successRate.minimumHosts());
1✔
584
      }
585
      if (successRate.requestVolume() != null) {
1✔
586
        successRateConfigBuilder.setRequestVolume(successRate.requestVolume());
1✔
587
      }
588

589
      configBuilder.setSuccessRateEjection(successRateConfigBuilder.build());
1✔
590
    }
591

592
    FailurePercentageEjection failurePercentage = outlierDetection.failurePercentageEjection();
1✔
593
    if (failurePercentage != null) {
1✔
594
      OutlierDetectionLoadBalancerConfig.FailurePercentageEjection.Builder
595
          failurePercentageConfigBuilder = new OutlierDetectionLoadBalancerConfig
1✔
596
          .FailurePercentageEjection.Builder();
597

598
      if (failurePercentage.threshold() != null) {
1✔
599
        failurePercentageConfigBuilder.setThreshold(failurePercentage.threshold());
1✔
600
      }
601
      if (failurePercentage.enforcementPercentage() != null) {
1✔
602
        failurePercentageConfigBuilder.setEnforcementPercentage(
1✔
603
            failurePercentage.enforcementPercentage());
1✔
604
      }
605
      if (failurePercentage.minimumHosts() != null) {
1✔
606
        failurePercentageConfigBuilder.setMinimumHosts(failurePercentage.minimumHosts());
1✔
607
      }
608
      if (failurePercentage.requestVolume() != null) {
1✔
609
        failurePercentageConfigBuilder.setRequestVolume(failurePercentage.requestVolume());
1✔
610
      }
611

612
      configBuilder.setFailurePercentageEjection(failurePercentageConfigBuilder.build());
1✔
613
    }
614

615
    return configBuilder.build();
1✔
616
  }
617
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc