• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19868

17 Jun 2025 01:36PM UTC coverage: 88.559% (-0.01%) from 88.571%
#19868

push

github

ejona86
xds: XdsNR should be subscribing to clusters with XdsDepManager

This is missing behavior defined in gRFC A74:

> As per gRFC A31, the ConfigSelector gives each RPC a ref to the
> cluster that was selected for it to ensure that the cluster is not
> removed from the xds_cluster_manager LB policy config before the RPC
> is done with its LB picks. These cluster refs will also hold a
> subscription for the cluster from the XdsDependencyManager, so that
> the XdsDependencyManager will not stop watching the cluster resource
> until the cluster is removed from the xds_cluster_manager LB policy
> config.

Without the logic, RPCs can race and see the error:

> INTERNAL: CdsLb for cluster0: Unable to find non-dynamic root cluster

Fixes #12152. This fixes the regression introduced in 297ab05e

34552 of 39016 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/../xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.collect.ImmutableList;
25
import com.google.errorprone.annotations.CheckReturnValue;
26
import io.grpc.InternalLogId;
27
import io.grpc.LoadBalancer;
28
import io.grpc.LoadBalancerRegistry;
29
import io.grpc.NameResolver;
30
import io.grpc.Status;
31
import io.grpc.StatusOr;
32
import io.grpc.util.GracefulSwitchLoadBalancer;
33
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
34
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
35
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
36
import io.grpc.xds.XdsClusterResource.CdsUpdate;
37
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
38
import io.grpc.xds.XdsConfig.Subscription;
39
import io.grpc.xds.XdsConfig.XdsClusterConfig;
40
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
41
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
42
import io.grpc.xds.client.XdsLogger;
43
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
44
import java.util.ArrayList;
45
import java.util.Arrays;
46
import java.util.Collections;
47
import java.util.List;
48

49
/**
50
 * Load balancer for cds_experimental LB policy. One instance per top-level cluster.
51
 * The top-level cluster may be a plain EDS/logical-DNS cluster or an aggregate cluster formed
52
 * by a group of sub-clusters in a tree hierarchy.
53
 */
54
final class CdsLoadBalancer2 extends LoadBalancer {
55
  private final XdsLogger logger;
56
  private final Helper helper;
57
  private final LoadBalancerRegistry lbRegistry;
58
  // Following fields are effectively final.
59
  private String clusterName;
60
  private Subscription clusterSubscription;
61
  private LoadBalancer childLb;
62

63
  CdsLoadBalancer2(Helper helper) {
64
    this(helper, LoadBalancerRegistry.getDefaultRegistry());
1✔
65
  }
1✔
66

67
  @VisibleForTesting
68
  CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
1✔
69
    this.helper = checkNotNull(helper, "helper");
1✔
70
    this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
1✔
71
    logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
1✔
72
    logger.log(XdsLogLevel.INFO, "Created");
1✔
73
  }
1✔
74

75
  @Override
76
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
77
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
78
    if (this.clusterName == null) {
1✔
79
      CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
80
      logger.log(XdsLogLevel.INFO, "Config: {0}", config);
1✔
81
      if (config.isDynamic) {
1✔
82
        clusterSubscription = resolvedAddresses.getAttributes()
1✔
83
            .get(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY)
1✔
84
            .subscribeToCluster(config.name);
1✔
85
      }
86
      this.clusterName = config.name;
1✔
87
    }
88
    XdsConfig xdsConfig = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CONFIG);
1✔
89
    StatusOr<XdsClusterConfig> clusterConfigOr = xdsConfig.getClusters().get(clusterName);
1✔
90
    if (clusterConfigOr == null) {
1✔
91
      if (clusterSubscription == null) {
1✔
92
        // Should be impossible, because XdsDependencyManager wouldn't have generated this
93
        return fail(Status.INTERNAL.withDescription(
×
94
            errorPrefix() + "Unable to find non-dynamic root cluster"));
×
95
      }
96
      // The dynamic cluster must not have loaded yet
97
      return Status.OK;
1✔
98
    }
99
    if (!clusterConfigOr.hasValue()) {
1✔
100
      return fail(clusterConfigOr.getStatus());
1✔
101
    }
102
    XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
1✔
103
    List<String> leafNames;
104
    if (clusterConfig.getChildren() instanceof AggregateConfig) {
1✔
105
      leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
1✔
106
    } else if (clusterConfig.getChildren() instanceof EndpointConfig) {
1✔
107
      leafNames = ImmutableList.of(clusterName);
1✔
108
    } else {
109
      return fail(Status.INTERNAL.withDescription(
×
110
          errorPrefix() + "Unexpected cluster children type: "
×
111
          + clusterConfig.getChildren().getClass()));
×
112
    }
113
    if (leafNames.isEmpty()) {
1✔
114
      // Should be impossible, because XdsClusterResource validated this
115
      return fail(Status.UNAVAILABLE.withDescription(
×
116
          errorPrefix() + "Zero leaf clusters for root cluster " + clusterName));
×
117
    }
118

119
    Status noneFoundError = Status.INTERNAL
1✔
120
        .withDescription(errorPrefix() + "No leaves and no error; this is a bug");
1✔
121
    List<DiscoveryMechanism> instances = new ArrayList<>();
1✔
122
    for (String leafName : leafNames) {
1✔
123
      StatusOr<XdsClusterConfig> leafConfigOr = xdsConfig.getClusters().get(leafName);
1✔
124
      if (!leafConfigOr.hasValue()) {
1✔
125
        noneFoundError = leafConfigOr.getStatus();
1✔
126
        continue;
1✔
127
      }
128
      if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) {
1✔
129
        noneFoundError = Status.INTERNAL.withDescription(
×
130
            errorPrefix() + "Unexpected child " + leafName + " cluster children type: "
×
131
            + leafConfigOr.getValue().getChildren().getClass());
×
132
        continue;
×
133
      }
134
      CdsUpdate result = leafConfigOr.getValue().getClusterResource();
1✔
135
      DiscoveryMechanism instance;
136
      if (result.clusterType() == ClusterType.EDS) {
1✔
137
        instance = DiscoveryMechanism.forEds(
1✔
138
            leafName,
139
            result.edsServiceName(),
1✔
140
            result.lrsServerInfo(),
1✔
141
            result.maxConcurrentRequests(),
1✔
142
            result.upstreamTlsContext(),
1✔
143
            result.filterMetadata(),
1✔
144
            result.outlierDetection());
1✔
145
      } else {
146
        instance = DiscoveryMechanism.forLogicalDns(
1✔
147
            leafName,
148
            result.dnsHostName(),
1✔
149
            result.lrsServerInfo(),
1✔
150
            result.maxConcurrentRequests(),
1✔
151
            result.upstreamTlsContext(),
1✔
152
            result.filterMetadata());
1✔
153
      }
154
      instances.add(instance);
1✔
155
    }
1✔
156
    if (instances.isEmpty()) {
1✔
157
      return fail(noneFoundError);
1✔
158
    }
159

160
    // The LB policy config is provided in service_config.proto/JSON format.
161
    NameResolver.ConfigOrError configOrError =
1✔
162
        GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
1✔
163
            Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry);
1✔
164
    if (configOrError.getError() != null) {
1✔
165
      // Should be impossible, because XdsClusterResource validated this
166
      return fail(Status.INTERNAL.withDescription(
×
167
          errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
×
168
    }
169

170
    ClusterResolverConfig config = new ClusterResolverConfig(
1✔
171
        Collections.unmodifiableList(instances),
1✔
172
        configOrError.getConfig(),
1✔
173
        clusterConfig.getClusterResource().isHttp11ProxyAvailable());
1✔
174
    if (childLb == null) {
1✔
175
      childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
1✔
176
    }
177
    return childLb.acceptResolvedAddresses(
1✔
178
        resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
1✔
179
  }
180

181
  @Override
182
  public void handleNameResolutionError(Status error) {
183
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
184
    if (childLb != null) {
1✔
185
      childLb.handleNameResolutionError(error);
1✔
186
    } else {
187
      helper.updateBalancingState(
1✔
188
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
189
    }
190
  }
1✔
191

192
  @Override
193
  public void shutdown() {
194
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
195
    if (childLb != null) {
1✔
196
      childLb.shutdown();
1✔
197
      childLb = null;
1✔
198
    }
199
    if (clusterSubscription != null) {
1✔
200
      clusterSubscription.close();
1✔
201
      clusterSubscription = null;
1✔
202
    }
203
  }
1✔
204

205
  @CheckReturnValue // don't forget to return up the stack after the fail call
206
  private Status fail(Status error) {
207
    if (childLb != null) {
1✔
208
      childLb.shutdown();
1✔
209
      childLb = null;
1✔
210
    }
211
    helper.updateBalancingState(
1✔
212
        TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
213
    return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter
1✔
214
  }
215

216
  private String errorPrefix() {
217
    return "CdsLb for " + clusterName + ": ";
1✔
218
  }
219
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc