• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19908

16 Jul 2025 07:54PM UTC coverage: 88.593% (+0.07%) from 88.528%
#19908

push

github

ejona86
Revert "xds: Convert CdsLb to XdsDepManager"

This reverts commit 297ab05ef.

b/430347751 shows multiple concerning behaviors in the xDS stack with
the new A74 config update model. XdsDepManager and CdsLB2 still seem to
be working correctly, but the change is exacerbated issues in other
parts of the stack, like RingHashConfig not having equals fixed in
a8de9f07ab.

Revert only for the v1.74.x release, leaving it on master.

34647 of 39108 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.79
/../xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import io.grpc.InternalLogId;
25
import io.grpc.LoadBalancer;
26
import io.grpc.LoadBalancerRegistry;
27
import io.grpc.NameResolver;
28
import io.grpc.Status;
29
import io.grpc.SynchronizationContext;
30
import io.grpc.internal.ObjectPool;
31
import io.grpc.util.GracefulSwitchLoadBalancer;
32
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
33
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
34
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
35
import io.grpc.xds.XdsClusterResource.CdsUpdate;
36
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
37
import io.grpc.xds.client.XdsClient;
38
import io.grpc.xds.client.XdsClient.ResourceWatcher;
39
import io.grpc.xds.client.XdsLogger;
40
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
41
import java.util.ArrayDeque;
42
import java.util.ArrayList;
43
import java.util.Arrays;
44
import java.util.Collections;
45
import java.util.HashMap;
46
import java.util.HashSet;
47
import java.util.LinkedHashMap;
48
import java.util.List;
49
import java.util.Map;
50
import java.util.Queue;
51
import java.util.Set;
52
import java.util.concurrent.ConcurrentHashMap;
53
import javax.annotation.Nullable;
54

55
/**
56
 * Load balancer for cds_experimental LB policy. One instance per top-level cluster.
57
 * The top-level cluster may be a plain EDS/logical-DNS cluster or an aggregate cluster formed
58
 * by a group of sub-clusters in a tree hierarchy.
59
 */
60
final class CdsLoadBalancer2 extends LoadBalancer {
61
  private final XdsLogger logger;
62
  private final Helper helper;
63
  private final SynchronizationContext syncContext;
64
  private final LoadBalancerRegistry lbRegistry;
65
  // Following fields are effectively final.
66
  private ObjectPool<XdsClient> xdsClientPool;
67
  private XdsClient xdsClient;
68
  private CdsLbState cdsLbState;
69
  private ResolvedAddresses resolvedAddresses;
70

71
  CdsLoadBalancer2(Helper helper) {
72
    this(helper, LoadBalancerRegistry.getDefaultRegistry());
1✔
73
  }
1✔
74

75
  @VisibleForTesting
76
  CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
1✔
77
    this.helper = checkNotNull(helper, "helper");
1✔
78
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
79
    this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
1✔
80
    logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
1✔
81
    logger.log(XdsLogLevel.INFO, "Created");
1✔
82
  }
1✔
83

84
  @Override
85
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
86
    if (this.resolvedAddresses != null) {
1✔
87
      return Status.OK;
1✔
88
    }
89
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
90
    this.resolvedAddresses = resolvedAddresses;
1✔
91
    xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL);
1✔
92
    xdsClient = xdsClientPool.getObject();
1✔
93
    CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
94
    logger.log(XdsLogLevel.INFO, "Config: {0}", config);
1✔
95
    cdsLbState = new CdsLbState(config.name);
1✔
96
    cdsLbState.start();
1✔
97
    return Status.OK;
1✔
98
  }
99

100
  @Override
101
  public void handleNameResolutionError(Status error) {
102
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
103
    if (cdsLbState != null && cdsLbState.childLb != null) {
1✔
104
      cdsLbState.childLb.handleNameResolutionError(error);
1✔
105
    } else {
106
      helper.updateBalancingState(
1✔
107
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
108
    }
109
  }
1✔
110

111
  @Override
112
  public void shutdown() {
113
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
114
    if (cdsLbState != null) {
1✔
115
      cdsLbState.shutdown();
1✔
116
    }
117
    if (xdsClientPool != null) {
1✔
118
      xdsClientPool.returnObject(xdsClient);
1✔
119
    }
120
  }
1✔
121

122
  /**
123
   * The state of a CDS working session of {@link CdsLoadBalancer2}. Created and started when
124
   * receiving the CDS LB policy config with the top-level cluster name.
125
   */
126
  private final class CdsLbState {
127

128
    private final ClusterState root;
129
    private final Map<String, ClusterState> clusterStates = new ConcurrentHashMap<>();
1✔
130
    private LoadBalancer childLb;
131

132
    private CdsLbState(String rootCluster) {
1✔
133
      root = new ClusterState(rootCluster);
1✔
134
    }
1✔
135

136
    private void start() {
137
      root.start();
1✔
138
    }
1✔
139

140
    private void shutdown() {
141
      root.shutdown();
1✔
142
      if (childLb != null) {
1✔
143
        childLb.shutdown();
1✔
144
      }
145
    }
1✔
146

147
    private void handleClusterDiscovered() {
148
      List<DiscoveryMechanism> instances = new ArrayList<>();
1✔
149

150
      // Used for loop detection to break the infinite recursion that loops would cause
151
      Map<ClusterState, List<ClusterState>> parentClusters = new HashMap<>();
1✔
152
      Status loopStatus = null;
1✔
153

154
      // Level-order traversal.
155
      // Collect configurations for all non-aggregate (leaf) clusters.
156
      Queue<ClusterState> queue = new ArrayDeque<>();
1✔
157
      queue.add(root);
1✔
158
      while (!queue.isEmpty()) {
1✔
159
        int size = queue.size();
1✔
160
        for (int i = 0; i < size; i++) {
1✔
161
          ClusterState clusterState = queue.remove();
1✔
162
          if (!clusterState.discovered) {
1✔
163
            return;  // do not proceed until all clusters discovered
1✔
164
          }
165
          if (clusterState.result == null) {  // resource revoked or not exists
1✔
166
            continue;
1✔
167
          }
168
          if (clusterState.isLeaf) {
1✔
169
            if (instances.stream().map(inst -> inst.cluster).noneMatch(clusterState.name::equals)) {
1✔
170
              DiscoveryMechanism instance;
171
              if (clusterState.result.clusterType() == ClusterType.EDS) {
1✔
172
                instance = DiscoveryMechanism.forEds(
1✔
173
                    clusterState.name, clusterState.result.edsServiceName(),
1✔
174
                    clusterState.result.lrsServerInfo(),
1✔
175
                    clusterState.result.maxConcurrentRequests(),
1✔
176
                    clusterState.result.upstreamTlsContext(),
1✔
177
                    clusterState.result.filterMetadata(),
1✔
178
                    clusterState.result.outlierDetection());
1✔
179
              } else {  // logical DNS
180
                instance = DiscoveryMechanism.forLogicalDns(
1✔
181
                    clusterState.name, clusterState.result.dnsHostName(),
1✔
182
                    clusterState.result.lrsServerInfo(),
1✔
183
                    clusterState.result.maxConcurrentRequests(),
1✔
184
                    clusterState.result.upstreamTlsContext(),
1✔
185
                    clusterState.result.filterMetadata());
1✔
186
              }
187
              instances.add(instance);
1✔
188
            }
1✔
189
          } else {
190
            if (clusterState.childClusterStates == null) {
1✔
191
              continue;
×
192
            }
193
            // Do loop detection and break recursion if detected
194
            List<String> namesCausingLoops = identifyLoops(clusterState, parentClusters);
1✔
195
            if (namesCausingLoops.isEmpty()) {
1✔
196
              queue.addAll(clusterState.childClusterStates.values());
1✔
197
            } else {
198
              // Do cleanup
199
              if (childLb != null) {
1✔
200
                childLb.shutdown();
1✔
201
                childLb = null;
1✔
202
              }
203
              if (loopStatus != null) {
1✔
204
                logger.log(XdsLogLevel.WARNING,
×
205
                    "Multiple loops in CDS config.  Old msg:  " + loopStatus.getDescription());
×
206
              }
207
              loopStatus = Status.UNAVAILABLE.withDescription(String.format(
1✔
208
                  "CDS error: circular aggregate clusters directly under %s for "
209
                      + "root cluster %s, named %s, xDS node ID: %s",
210
                  clusterState.name, root.name, namesCausingLoops,
1✔
211
                  xdsClient.getBootstrapInfo().node().getId()));
1✔
212
            }
213
          }
214
        }
215
      }
1✔
216

217
      if (loopStatus != null) {
1✔
218
        helper.updateBalancingState(
1✔
219
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus)));
1✔
220
        return;
1✔
221
      }
222

223
      if (instances.isEmpty()) {  // none of non-aggregate clusters exists
1✔
224
        if (childLb != null) {
1✔
225
          childLb.shutdown();
1✔
226
          childLb = null;
1✔
227
        }
228
        Status unavailable = Status.UNAVAILABLE.withDescription(String.format(
1✔
229
            "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster %s"
230
                + " xDS node ID: %s", root.name, xdsClient.getBootstrapInfo().node().getId()));
1✔
231
        helper.updateBalancingState(
1✔
232
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable)));
1✔
233
        return;
1✔
234
      }
235

236
      // The LB policy config is provided in service_config.proto/JSON format.
237
      NameResolver.ConfigOrError configOrError =
1✔
238
          GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
1✔
239
              Arrays.asList(root.result.lbPolicyConfig()), lbRegistry);
1✔
240
      if (configOrError.getError() != null) {
1✔
241
        throw configOrError.getError().augmentDescription("Unable to parse the LB config")
1✔
242
            .asRuntimeException();
1✔
243
      }
244

245
      ClusterResolverConfig config = new ClusterResolverConfig(
1✔
246
          Collections.unmodifiableList(instances),
1✔
247
          configOrError.getConfig(),
1✔
248
          root.result.isHttp11ProxyAvailable());
1✔
249
      if (childLb == null) {
1✔
250
        childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
1✔
251
      }
252
      childLb.handleResolvedAddresses(
1✔
253
          resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
1✔
254
    }
1✔
255

256
    /**
257
     * Returns children that would cause loops and builds up the parentClusters map.
258
     **/
259

260
    private List<String> identifyLoops(ClusterState clusterState,
261
        Map<ClusterState, List<ClusterState>> parentClusters) {
262
      Set<String> ancestors = new HashSet<>();
1✔
263
      ancestors.add(clusterState.name);
1✔
264
      addAncestors(ancestors, clusterState, parentClusters);
1✔
265

266
      List<String> namesCausingLoops = new ArrayList<>();
1✔
267
      for (ClusterState state : clusterState.childClusterStates.values()) {
1✔
268
        if (ancestors.contains(state.name)) {
1✔
269
          namesCausingLoops.add(state.name);
1✔
270
        }
271
      }
1✔
272

273
      // Update parent map with entries from remaining children to clusterState
274
      clusterState.childClusterStates.values().stream()
1✔
275
          .filter(child -> !namesCausingLoops.contains(child.name))
1✔
276
          .forEach(
1✔
277
              child -> parentClusters.computeIfAbsent(child, k -> new ArrayList<>())
1✔
278
                  .add(clusterState));
1✔
279

280
      return namesCausingLoops;
1✔
281
    }
282

283
    /** Recursively add all parents to the ancestors list. **/
284
    private void addAncestors(Set<String> ancestors, ClusterState clusterState,
285
        Map<ClusterState, List<ClusterState>> parentClusters) {
286
      List<ClusterState> directParents = parentClusters.get(clusterState);
1✔
287
      if (directParents != null) {
1✔
288
        directParents.stream().map(c -> c.name).forEach(ancestors::add);
1✔
289
        directParents.forEach(p -> addAncestors(ancestors, p, parentClusters));
1✔
290
      }
291
    }
1✔
292

293
    private void handleClusterDiscoveryError(Status error) {
294
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
295
      Status errorWithNodeId = error.withDescription(
1✔
296
              description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
297
      if (childLb != null) {
1✔
298
        childLb.handleNameResolutionError(errorWithNodeId);
1✔
299
      } else {
300
        helper.updateBalancingState(
1✔
301
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(errorWithNodeId)));
1✔
302
      }
303
    }
1✔
304

305
    private final class ClusterState implements ResourceWatcher<CdsUpdate> {
306
      private final String name;
307
      @Nullable
308
      private Map<String, ClusterState> childClusterStates;
309
      @Nullable
310
      private CdsUpdate result;
311
      // Following fields are effectively final.
312
      private boolean isLeaf;
313
      private boolean discovered;
314
      private boolean shutdown;
315

316
      private ClusterState(String name) {
1✔
317
        this.name = name;
1✔
318
      }
1✔
319

320
      private void start() {
321
        shutdown = false;
1✔
322
        xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
1✔
323
      }
1✔
324

325
      void shutdown() {
326
        shutdown = true;
1✔
327
        xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this);
1✔
328
        if (childClusterStates != null) {
1✔
329
          // recursively shut down all descendants
330
          childClusterStates.values().stream()
1✔
331
              .filter(state -> !state.shutdown)
1✔
332
              .forEach(ClusterState::shutdown);
1✔
333
        }
334
      }
1✔
335

336
      @Override
337
      public void onError(Status error) {
338
        Status status = Status.UNAVAILABLE
1✔
339
            .withDescription(
1✔
340
                String.format("Unable to load CDS %s. xDS server returned: %s: %s",
1✔
341
                  name, error.getCode(), error.getDescription()))
1✔
342
            .withCause(error.getCause());
1✔
343
        if (shutdown) {
1✔
344
          return;
×
345
        }
346
        // All watchers should receive the same error, so we only propagate it once.
347
        if (ClusterState.this == root) {
1✔
348
          handleClusterDiscoveryError(status);
1✔
349
        }
350
      }
1✔
351

352
      @Override
353
      public void onResourceDoesNotExist(String resourceName) {
354
        if (shutdown) {
1✔
355
          return;
×
356
        }
357
        discovered = true;
1✔
358
        result = null;
1✔
359
        if (childClusterStates != null) {
1✔
360
          for (ClusterState state : childClusterStates.values()) {
1✔
361
            state.shutdown();
1✔
362
          }
1✔
363
          childClusterStates = null;
1✔
364
        }
365
        handleClusterDiscovered();
1✔
366
      }
1✔
367

368
      @Override
369
      public void onChanged(final CdsUpdate update) {
370
        if (shutdown) {
1✔
371
          return;
×
372
        }
373
        logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
1✔
374
        discovered = true;
1✔
375
        result = update;
1✔
376
        if (update.clusterType() == ClusterType.AGGREGATE) {
1✔
377
          isLeaf = false;
1✔
378
          logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
1✔
379
              update.clusterName(), update.prioritizedClusterNames());
1✔
380
          Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
1✔
381
          for (String cluster : update.prioritizedClusterNames()) {
1✔
382
            if (newChildStates.containsKey(cluster)) {
1✔
383
              logger.log(XdsLogLevel.WARNING,
1✔
384
                  String.format("duplicate cluster name %s in aggregate %s is being ignored",
1✔
385
                      cluster, update.clusterName()));
1✔
386
              continue;
1✔
387
            }
388
            if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
1✔
389
              ClusterState childState;
390
              if (clusterStates.containsKey(cluster)) {
1✔
391
                childState = clusterStates.get(cluster);
1✔
392
                if (childState.shutdown) {
1✔
393
                  childState.start();
×
394
                }
395
              } else {
396
                childState = new ClusterState(cluster);
1✔
397
                clusterStates.put(cluster, childState);
1✔
398
                childState.start();
1✔
399
              }
400
              newChildStates.put(cluster, childState);
1✔
401
            } else {
1✔
402
              newChildStates.put(cluster, childClusterStates.remove(cluster));
1✔
403
            }
404
          }
1✔
405
          if (childClusterStates != null) {  // stop subscribing to revoked child clusters
1✔
406
            for (ClusterState watcher : childClusterStates.values()) {
1✔
407
              watcher.shutdown();
1✔
408
            }
1✔
409
          }
410
          childClusterStates = newChildStates;
1✔
411
        } else if (update.clusterType() == ClusterType.EDS) {
1✔
412
          isLeaf = true;
1✔
413
          logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
1✔
414
              update.clusterName(), update.edsServiceName());
1✔
415
        } else {  // logical DNS
416
          isLeaf = true;
1✔
417
          logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
1✔
418
        }
419
        handleClusterDiscovered();
1✔
420
      }
1✔
421

422
    }
423
  }
424
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc