• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19455

12 Sep 2024 10:40PM UTC coverage: 84.521% (-0.003%) from 84.524%
#19455

push

github

web-flow
xds: Add xDS node ID in few control plane errors (#11519)

33555 of 39700 relevant lines covered (84.52%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.3
/../xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import io.grpc.InternalLogId;
25
import io.grpc.LoadBalancer;
26
import io.grpc.LoadBalancerRegistry;
27
import io.grpc.NameResolver;
28
import io.grpc.Status;
29
import io.grpc.SynchronizationContext;
30
import io.grpc.internal.ObjectPool;
31
import io.grpc.util.GracefulSwitchLoadBalancer;
32
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
33
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
34
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
35
import io.grpc.xds.XdsClusterResource.CdsUpdate;
36
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
37
import io.grpc.xds.client.XdsClient;
38
import io.grpc.xds.client.XdsClient.ResourceWatcher;
39
import io.grpc.xds.client.XdsLogger;
40
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
41
import java.util.ArrayDeque;
42
import java.util.ArrayList;
43
import java.util.Arrays;
44
import java.util.Collections;
45
import java.util.HashMap;
46
import java.util.HashSet;
47
import java.util.LinkedHashMap;
48
import java.util.List;
49
import java.util.Map;
50
import java.util.Queue;
51
import java.util.Set;
52
import java.util.concurrent.ConcurrentHashMap;
53
import javax.annotation.Nullable;
54

55
/**
56
 * Load balancer for cds_experimental LB policy. One instance per top-level cluster.
57
 * The top-level cluster may be a plain EDS/logical-DNS cluster or an aggregate cluster formed
58
 * by a group of sub-clusters in a tree hierarchy.
59
 */
60
final class CdsLoadBalancer2 extends LoadBalancer {
61
  private final XdsLogger logger;
62
  private final Helper helper;
63
  private final SynchronizationContext syncContext;
64
  private final LoadBalancerRegistry lbRegistry;
65
  // Following fields are effectively final.
66
  private ObjectPool<XdsClient> xdsClientPool;
67
  private XdsClient xdsClient;
68
  private CdsLbState cdsLbState;
69
  private ResolvedAddresses resolvedAddresses;
70

71
  CdsLoadBalancer2(Helper helper) {
72
    this(helper, LoadBalancerRegistry.getDefaultRegistry());
1✔
73
  }
1✔
74

75
  @VisibleForTesting
76
  CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
1✔
77
    this.helper = checkNotNull(helper, "helper");
1✔
78
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
79
    this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
1✔
80
    logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
1✔
81
    logger.log(XdsLogLevel.INFO, "Created");
1✔
82
  }
1✔
83

84
  @Override
85
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
86
    if (this.resolvedAddresses != null) {
1✔
87
      return Status.OK;
×
88
    }
89
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
90
    this.resolvedAddresses = resolvedAddresses;
1✔
91
    xdsClientPool = resolvedAddresses.getAttributes().get(InternalXdsAttributes.XDS_CLIENT_POOL);
1✔
92
    xdsClient = xdsClientPool.getObject();
1✔
93
    CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
94
    logger.log(XdsLogLevel.INFO, "Config: {0}", config);
1✔
95
    cdsLbState = new CdsLbState(config.name);
1✔
96
    cdsLbState.start();
1✔
97
    return Status.OK;
1✔
98
  }
99

100
  @Override
101
  public void handleNameResolutionError(Status error) {
102
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
103
    if (cdsLbState != null && cdsLbState.childLb != null) {
1✔
104
      cdsLbState.childLb.handleNameResolutionError(error);
1✔
105
    } else {
106
      helper.updateBalancingState(
1✔
107
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
108
    }
109
  }
1✔
110

111
  @Override
112
  public void shutdown() {
113
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
114
    if (cdsLbState != null) {
1✔
115
      cdsLbState.shutdown();
1✔
116
    }
117
    if (xdsClientPool != null) {
1✔
118
      xdsClientPool.returnObject(xdsClient);
1✔
119
    }
120
  }
1✔
121

122
  /**
123
   * The state of a CDS working session of {@link CdsLoadBalancer2}. Created and started when
124
   * receiving the CDS LB policy config with the top-level cluster name.
125
   */
126
  private final class CdsLbState {
127

128
    private final ClusterState root;
129
    private final Map<String, ClusterState> clusterStates = new ConcurrentHashMap<>();
1✔
130
    private LoadBalancer childLb;
131

132
    private CdsLbState(String rootCluster) {
1✔
133
      root = new ClusterState(rootCluster);
1✔
134
    }
1✔
135

136
    private void start() {
137
      root.start();
1✔
138
    }
1✔
139

140
    private void shutdown() {
141
      root.shutdown();
1✔
142
      if (childLb != null) {
1✔
143
        childLb.shutdown();
1✔
144
      }
145
    }
1✔
146

147
    private void handleClusterDiscovered() {
148
      List<DiscoveryMechanism> instances = new ArrayList<>();
1✔
149

150
      // Used for loop detection to break the infinite recursion that loops would cause
151
      Map<ClusterState, List<ClusterState>> parentClusters = new HashMap<>();
1✔
152
      Status loopStatus = null;
1✔
153

154
      // Level-order traversal.
155
      // Collect configurations for all non-aggregate (leaf) clusters.
156
      Queue<ClusterState> queue = new ArrayDeque<>();
1✔
157
      queue.add(root);
1✔
158
      while (!queue.isEmpty()) {
1✔
159
        int size = queue.size();
1✔
160
        for (int i = 0; i < size; i++) {
1✔
161
          ClusterState clusterState = queue.remove();
1✔
162
          if (!clusterState.discovered) {
1✔
163
            return;  // do not proceed until all clusters discovered
1✔
164
          }
165
          if (clusterState.result == null) {  // resource revoked or not exists
1✔
166
            continue;
1✔
167
          }
168
          if (clusterState.isLeaf) {
1✔
169
            if (instances.stream().map(inst -> inst.cluster).noneMatch(clusterState.name::equals)) {
1✔
170
              DiscoveryMechanism instance;
171
              if (clusterState.result.clusterType() == ClusterType.EDS) {
1✔
172
                instance = DiscoveryMechanism.forEds(
1✔
173
                    clusterState.name, clusterState.result.edsServiceName(),
1✔
174
                    clusterState.result.lrsServerInfo(),
1✔
175
                    clusterState.result.maxConcurrentRequests(),
1✔
176
                    clusterState.result.upstreamTlsContext(),
1✔
177
                    clusterState.result.filterMetadata(),
1✔
178
                    clusterState.result.outlierDetection());
1✔
179
              } else {  // logical DNS
180
                instance = DiscoveryMechanism.forLogicalDns(
1✔
181
                    clusterState.name, clusterState.result.dnsHostName(),
1✔
182
                    clusterState.result.lrsServerInfo(),
1✔
183
                    clusterState.result.maxConcurrentRequests(),
1✔
184
                    clusterState.result.upstreamTlsContext(),
1✔
185
                    clusterState.result.filterMetadata());
1✔
186
              }
187
              instances.add(instance);
1✔
188
            }
1✔
189
          } else {
190
            if (clusterState.childClusterStates == null) {
1✔
191
              continue;
×
192
            }
193
            // Do loop detection and break recursion if detected
194
            List<String> namesCausingLoops = identifyLoops(clusterState, parentClusters);
1✔
195
            if (namesCausingLoops.isEmpty()) {
1✔
196
              queue.addAll(clusterState.childClusterStates.values());
1✔
197
            } else {
198
              // Do cleanup
199
              if (childLb != null) {
1✔
200
                childLb.shutdown();
1✔
201
                childLb = null;
1✔
202
              }
203
              if (loopStatus != null) {
1✔
204
                logger.log(XdsLogLevel.WARNING,
×
205
                    "Multiple loops in CDS config.  Old msg:  " + loopStatus.getDescription());
×
206
              }
207
              loopStatus = Status.UNAVAILABLE.withDescription(String.format(
1✔
208
                  "CDS error: circular aggregate clusters directly under %s for "
209
                      + "root cluster %s, named %s, xDS node ID: %s",
210
                  clusterState.name, root.name, namesCausingLoops,
1✔
211
                  xdsClient.getBootstrapInfo().node().getId()));
1✔
212
            }
213
          }
214
        }
215
      }
1✔
216

217
      if (loopStatus != null) {
1✔
218
        helper.updateBalancingState(
1✔
219
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus)));
1✔
220
        return;
1✔
221
      }
222

223
      if (instances.isEmpty()) {  // none of non-aggregate clusters exists
1✔
224
        if (childLb != null) {
1✔
225
          childLb.shutdown();
1✔
226
          childLb = null;
1✔
227
        }
228
        Status unavailable = Status.UNAVAILABLE.withDescription(String.format(
1✔
229
            "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster %s"
230
                + " xDS node ID: %s", root.name, xdsClient.getBootstrapInfo().node().getId()));
1✔
231
        helper.updateBalancingState(
1✔
232
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable)));
1✔
233
        return;
1✔
234
      }
235

236
      // The LB policy config is provided in service_config.proto/JSON format.
237
      NameResolver.ConfigOrError configOrError =
1✔
238
          GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
1✔
239
              Arrays.asList(root.result.lbPolicyConfig()), lbRegistry);
1✔
240
      if (configOrError.getError() != null) {
1✔
241
        throw configOrError.getError().augmentDescription("Unable to parse the LB config")
1✔
242
            .asRuntimeException();
1✔
243
      }
244

245
      ClusterResolverConfig config = new ClusterResolverConfig(
1✔
246
          Collections.unmodifiableList(instances), configOrError.getConfig());
1✔
247
      if (childLb == null) {
1✔
248
        childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
1✔
249
      }
250
      childLb.handleResolvedAddresses(
1✔
251
          resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
1✔
252
    }
1✔
253

254
    /**
255
     * Returns children that would cause loops and builds up the parentClusters map.
256
     **/
257

258
    private List<String> identifyLoops(ClusterState clusterState,
259
        Map<ClusterState, List<ClusterState>> parentClusters) {
260
      Set<String> ancestors = new HashSet<>();
1✔
261
      ancestors.add(clusterState.name);
1✔
262
      addAncestors(ancestors, clusterState, parentClusters);
1✔
263

264
      List<String> namesCausingLoops = new ArrayList<>();
1✔
265
      for (ClusterState state : clusterState.childClusterStates.values()) {
1✔
266
        if (ancestors.contains(state.name)) {
1✔
267
          namesCausingLoops.add(state.name);
1✔
268
        }
269
      }
1✔
270

271
      // Update parent map with entries from remaining children to clusterState
272
      clusterState.childClusterStates.values().stream()
1✔
273
          .filter(child -> !namesCausingLoops.contains(child.name))
1✔
274
          .forEach(
1✔
275
              child -> parentClusters.computeIfAbsent(child, k -> new ArrayList<>())
1✔
276
                  .add(clusterState));
1✔
277

278
      return namesCausingLoops;
1✔
279
    }
280

281
    /** Recursively add all parents to the ancestors list. **/
282
    private void addAncestors(Set<String> ancestors, ClusterState clusterState,
283
        Map<ClusterState, List<ClusterState>> parentClusters) {
284
      List<ClusterState> directParents = parentClusters.get(clusterState);
1✔
285
      if (directParents != null) {
1✔
286
        directParents.stream().map(c -> c.name).forEach(ancestors::add);
1✔
287
        directParents.forEach(p -> addAncestors(ancestors, p, parentClusters));
1✔
288
      }
289
    }
1✔
290

291
    private void handleClusterDiscoveryError(Status error) {
292
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
293
      Status errorWithNodeId = error.withDescription(
1✔
294
              description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
295
      if (childLb != null) {
1✔
296
        childLb.handleNameResolutionError(errorWithNodeId);
1✔
297
      } else {
298
        helper.updateBalancingState(
1✔
299
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(errorWithNodeId)));
1✔
300
      }
301
    }
1✔
302

303
    private final class ClusterState implements ResourceWatcher<CdsUpdate> {
304
      private final String name;
305
      @Nullable
306
      private Map<String, ClusterState> childClusterStates;
307
      @Nullable
308
      private CdsUpdate result;
309
      // Following fields are effectively final.
310
      private boolean isLeaf;
311
      private boolean discovered;
312
      private boolean shutdown;
313

314
      private ClusterState(String name) {
1✔
315
        this.name = name;
1✔
316
      }
1✔
317

318
      private void start() {
319
        shutdown = false;
1✔
320
        xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
1✔
321
      }
1✔
322

323
      void shutdown() {
324
        shutdown = true;
1✔
325
        xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this);
1✔
326
        if (childClusterStates != null) {
1✔
327
          // recursively shut down all descendants
328
          childClusterStates.values().stream()
1✔
329
              .filter(state -> !state.shutdown)
1✔
330
              .forEach(ClusterState::shutdown);
1✔
331
        }
332
      }
1✔
333

334
      @Override
335
      public void onError(Status error) {
336
        Status status = Status.UNAVAILABLE
1✔
337
            .withDescription(
1✔
338
                String.format("Unable to load CDS %s. xDS server returned: %s: %s",
1✔
339
                  name, error.getCode(), error.getDescription()))
1✔
340
            .withCause(error.getCause());
1✔
341
        if (shutdown) {
1✔
342
          return;
×
343
        }
344
        // All watchers should receive the same error, so we only propagate it once.
345
        if (ClusterState.this == root) {
1✔
346
          handleClusterDiscoveryError(status);
1✔
347
        }
348
      }
1✔
349

350
      @Override
351
      public void onResourceDoesNotExist(String resourceName) {
352
        if (shutdown) {
1✔
353
          return;
×
354
        }
355
        discovered = true;
1✔
356
        result = null;
1✔
357
        if (childClusterStates != null) {
1✔
358
          for (ClusterState state : childClusterStates.values()) {
1✔
359
            state.shutdown();
1✔
360
          }
1✔
361
          childClusterStates = null;
1✔
362
        }
363
        handleClusterDiscovered();
1✔
364
      }
1✔
365

366
      @Override
367
      public void onChanged(final CdsUpdate update) {
368
        if (shutdown) {
1✔
369
          return;
×
370
        }
371
        logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
1✔
372
        discovered = true;
1✔
373
        result = update;
1✔
374
        if (update.clusterType() == ClusterType.AGGREGATE) {
1✔
375
          isLeaf = false;
1✔
376
          logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
1✔
377
              update.clusterName(), update.prioritizedClusterNames());
1✔
378
          Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
1✔
379
          for (String cluster : update.prioritizedClusterNames()) {
1✔
380
            if (newChildStates.containsKey(cluster)) {
1✔
381
              logger.log(XdsLogLevel.WARNING,
1✔
382
                  String.format("duplicate cluster name %s in aggregate %s is being ignored",
1✔
383
                      cluster, update.clusterName()));
1✔
384
              continue;
1✔
385
            }
386
            if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
1✔
387
              ClusterState childState;
388
              if (clusterStates.containsKey(cluster)) {
1✔
389
                childState = clusterStates.get(cluster);
1✔
390
                if (childState.shutdown) {
1✔
391
                  childState.start();
×
392
                }
393
              } else {
394
                childState = new ClusterState(cluster);
1✔
395
                clusterStates.put(cluster, childState);
1✔
396
                childState.start();
1✔
397
              }
398
              newChildStates.put(cluster, childState);
1✔
399
            } else {
1✔
400
              newChildStates.put(cluster, childClusterStates.remove(cluster));
1✔
401
            }
402
          }
1✔
403
          if (childClusterStates != null) {  // stop subscribing to revoked child clusters
1✔
404
            for (ClusterState watcher : childClusterStates.values()) {
1✔
405
              watcher.shutdown();
1✔
406
            }
1✔
407
          }
408
          childClusterStates = newChildStates;
1✔
409
        } else if (update.clusterType() == ClusterType.EDS) {
1✔
410
          isLeaf = true;
1✔
411
          logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
1✔
412
              update.clusterName(), update.edsServiceName());
1✔
413
        } else {  // logical DNS
414
          isLeaf = true;
1✔
415
          logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
1✔
416
        }
417
        handleClusterDiscovered();
1✔
418
      }
1✔
419

420
    }
421
  }
422
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc