• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19249

24 May 2024 10:08PM UTC coverage: 88.444% (-0.07%) from 88.512%
#19249

push

github

web-flow
xds: Plumb the Cluster's filterMetadata to RPCs

This will be used by CSM observability, and may get exposed to further
uses in the future.

32060 of 36249 relevant lines covered (88.44%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.36
/../xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import io.grpc.InternalLogId;
25
import io.grpc.LoadBalancer;
26
import io.grpc.LoadBalancerProvider;
27
import io.grpc.LoadBalancerRegistry;
28
import io.grpc.NameResolver;
29
import io.grpc.Status;
30
import io.grpc.SynchronizationContext;
31
import io.grpc.internal.ObjectPool;
32
import io.grpc.internal.ServiceConfigUtil;
33
import io.grpc.internal.ServiceConfigUtil.LbConfig;
34
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
35
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
36
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
37
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
38
import io.grpc.xds.XdsClusterResource.CdsUpdate;
39
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
40
import io.grpc.xds.client.XdsClient;
41
import io.grpc.xds.client.XdsClient.ResourceWatcher;
42
import io.grpc.xds.client.XdsLogger;
43
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
44
import java.util.ArrayDeque;
45
import java.util.ArrayList;
46
import java.util.Collections;
47
import java.util.HashMap;
48
import java.util.HashSet;
49
import java.util.LinkedHashMap;
50
import java.util.List;
51
import java.util.Map;
52
import java.util.Queue;
53
import java.util.Set;
54
import java.util.concurrent.ConcurrentHashMap;
55
import javax.annotation.Nullable;
56

57
/**
58
 * Load balancer for cds_experimental LB policy. One instance per top-level cluster.
59
 * The top-level cluster may be a plain EDS/logical-DNS cluster or an aggregate cluster formed
60
 * by a group of sub-clusters in a tree hierarchy.
61
 */
62
final class CdsLoadBalancer2 extends LoadBalancer {
63
  private final XdsLogger logger;
64
  private final Helper helper;
65
  private final SynchronizationContext syncContext;
66
  private final LoadBalancerRegistry lbRegistry;
67
  // Following fields are effectively final.
68
  private ObjectPool<XdsClient> xdsClientPool;
69
  private XdsClient xdsClient;
70
  private CdsLbState cdsLbState;
71
  private ResolvedAddresses resolvedAddresses;
72

73
  CdsLoadBalancer2(Helper helper) {
74
    this(helper, LoadBalancerRegistry.getDefaultRegistry());
1✔
75
  }
1✔
76

77
  @VisibleForTesting
78
  CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
1✔
79
    this.helper = checkNotNull(helper, "helper");
1✔
80
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
81
    this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
1✔
82
    logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
1✔
83
    logger.log(XdsLogLevel.INFO, "Created");
1✔
84
  }
1✔
85

86
  @Override
87
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
88
    if (this.resolvedAddresses != null) {
1✔
89
      return Status.OK;
×
90
    }
91
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
92
    this.resolvedAddresses = resolvedAddresses;
1✔
93
    xdsClientPool = resolvedAddresses.getAttributes().get(InternalXdsAttributes.XDS_CLIENT_POOL);
1✔
94
    xdsClient = xdsClientPool.getObject();
1✔
95
    CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
96
    logger.log(XdsLogLevel.INFO, "Config: {0}", config);
1✔
97
    cdsLbState = new CdsLbState(config.name);
1✔
98
    cdsLbState.start();
1✔
99
    return Status.OK;
1✔
100
  }
101

102
  @Override
103
  public void handleNameResolutionError(Status error) {
104
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
105
    if (cdsLbState != null && cdsLbState.childLb != null) {
1✔
106
      cdsLbState.childLb.handleNameResolutionError(error);
1✔
107
    } else {
108
      helper.updateBalancingState(
1✔
109
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
110
    }
111
  }
1✔
112

113
  @Override
114
  public void shutdown() {
115
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
116
    if (cdsLbState != null) {
1✔
117
      cdsLbState.shutdown();
1✔
118
    }
119
    if (xdsClientPool != null) {
1✔
120
      xdsClientPool.returnObject(xdsClient);
1✔
121
    }
122
  }
1✔
123

124
  /**
125
   * The state of a CDS working session of {@link CdsLoadBalancer2}. Created and started when
126
   * receiving the CDS LB policy config with the top-level cluster name.
127
   */
128
  private final class CdsLbState {
129

130
    private final ClusterState root;
131
    private final Map<String, ClusterState> clusterStates = new ConcurrentHashMap<>();
1✔
132
    private LoadBalancer childLb;
133

134
    private CdsLbState(String rootCluster) {
1✔
135
      root = new ClusterState(rootCluster);
1✔
136
    }
1✔
137

138
    private void start() {
139
      root.start();
1✔
140
    }
1✔
141

142
    private void shutdown() {
143
      root.shutdown();
1✔
144
      if (childLb != null) {
1✔
145
        childLb.shutdown();
1✔
146
      }
147
    }
1✔
148

149
    private void handleClusterDiscovered() {
150
      List<DiscoveryMechanism> instances = new ArrayList<>();
1✔
151

152
      // Used for loop detection to break the infinite recursion that loops would cause
153
      Map<ClusterState, List<ClusterState>> parentClusters = new HashMap<>();
1✔
154
      Status loopStatus = null;
1✔
155

156
      // Level-order traversal.
157
      // Collect configurations for all non-aggregate (leaf) clusters.
158
      Queue<ClusterState> queue = new ArrayDeque<>();
1✔
159
      queue.add(root);
1✔
160
      while (!queue.isEmpty()) {
1✔
161
        int size = queue.size();
1✔
162
        for (int i = 0; i < size; i++) {
1✔
163
          ClusterState clusterState = queue.remove();
1✔
164
          if (!clusterState.discovered) {
1✔
165
            return;  // do not proceed until all clusters discovered
1✔
166
          }
167
          if (clusterState.result == null) {  // resource revoked or not exists
1✔
168
            continue;
1✔
169
          }
170
          if (clusterState.isLeaf) {
1✔
171
            if (instances.stream().map(inst -> inst.cluster).noneMatch(clusterState.name::equals)) {
1✔
172
              DiscoveryMechanism instance;
173
              if (clusterState.result.clusterType() == ClusterType.EDS) {
1✔
174
                instance = DiscoveryMechanism.forEds(
1✔
175
                    clusterState.name, clusterState.result.edsServiceName(),
1✔
176
                    clusterState.result.lrsServerInfo(),
1✔
177
                    clusterState.result.maxConcurrentRequests(),
1✔
178
                    clusterState.result.upstreamTlsContext(),
1✔
179
                    clusterState.result.filterMetadata(),
1✔
180
                    clusterState.result.outlierDetection());
1✔
181
              } else {  // logical DNS
182
                instance = DiscoveryMechanism.forLogicalDns(
1✔
183
                    clusterState.name, clusterState.result.dnsHostName(),
1✔
184
                    clusterState.result.lrsServerInfo(),
1✔
185
                    clusterState.result.maxConcurrentRequests(),
1✔
186
                    clusterState.result.upstreamTlsContext(),
1✔
187
                    clusterState.result.filterMetadata());
1✔
188
              }
189
              instances.add(instance);
1✔
190
            }
1✔
191
          } else {
192
            if (clusterState.childClusterStates == null) {
1✔
193
              continue;
×
194
            }
195
            // Do loop detection and break recursion if detected
196
            List<String> namesCausingLoops = identifyLoops(clusterState, parentClusters);
1✔
197
            if (namesCausingLoops.isEmpty()) {
1✔
198
              queue.addAll(clusterState.childClusterStates.values());
1✔
199
            } else {
200
              // Do cleanup
201
              if (childLb != null) {
1✔
202
                childLb.shutdown();
1✔
203
                childLb = null;
1✔
204
              }
205
              if (loopStatus != null) {
1✔
206
                logger.log(XdsLogLevel.WARNING,
×
207
                    "Multiple loops in CDS config.  Old msg:  " + loopStatus.getDescription());
×
208
              }
209
              loopStatus = Status.UNAVAILABLE.withDescription(String.format(
1✔
210
                  "CDS error: circular aggregate clusters directly under %s for "
211
                      + "root cluster %s, named %s",
212
                  clusterState.name, root.name, namesCausingLoops));
1✔
213
            }
214
          }
215
        }
216
      }
1✔
217

218
      if (loopStatus != null) {
1✔
219
        helper.updateBalancingState(
1✔
220
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus)));
1✔
221
        return;
1✔
222
      }
223

224
      if (instances.isEmpty()) {  // none of non-aggregate clusters exists
1✔
225
        if (childLb != null) {
1✔
226
          childLb.shutdown();
1✔
227
          childLb = null;
1✔
228
        }
229
        Status unavailable =
1✔
230
            Status.UNAVAILABLE.withDescription("CDS error: found 0 leaf (logical DNS or EDS) "
1✔
231
                + "clusters for root cluster " + root.name);
1✔
232
        helper.updateBalancingState(
1✔
233
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable)));
1✔
234
        return;
1✔
235
      }
236

237
      // The LB policy config is provided in service_config.proto/JSON format. It is unwrapped
238
      // to determine the name of the policy in the load balancer registry.
239
      LbConfig unwrappedLbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(
1✔
240
          root.result.lbPolicyConfig());
1✔
241
      LoadBalancerProvider lbProvider = lbRegistry.getProvider(unwrappedLbConfig.getPolicyName());
1✔
242
      if (lbProvider == null) {
1✔
243
        throw NameResolver.ConfigOrError.fromError(Status.UNAVAILABLE.withDescription(
1✔
244
                "No provider available for LB: " + unwrappedLbConfig.getPolicyName())).getError()
1✔
245
            .asRuntimeException();
1✔
246
      }
247
      NameResolver.ConfigOrError configOrError = lbProvider.parseLoadBalancingPolicyConfig(
1✔
248
          unwrappedLbConfig.getRawConfigValue());
1✔
249
      if (configOrError.getError() != null) {
1✔
250
        throw configOrError.getError().augmentDescription("Unable to parse the LB config")
1✔
251
            .asRuntimeException();
1✔
252
      }
253

254
      ClusterResolverConfig config = new ClusterResolverConfig(
1✔
255
          Collections.unmodifiableList(instances),
1✔
256
          new PolicySelection(lbProvider, configOrError.getConfig()));
1✔
257
      if (childLb == null) {
1✔
258
        childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
1✔
259
      }
260
      childLb.handleResolvedAddresses(
1✔
261
          resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
1✔
262
    }
1✔
263

264
    /**
265
     * Returns children that would cause loops and builds up the parentClusters map.
266
     **/
267

268
    private List<String> identifyLoops(ClusterState clusterState,
269
        Map<ClusterState, List<ClusterState>> parentClusters) {
270
      Set<String> ancestors = new HashSet<>();
1✔
271
      ancestors.add(clusterState.name);
1✔
272
      addAncestors(ancestors, clusterState, parentClusters);
1✔
273

274
      List<String> namesCausingLoops = new ArrayList<>();
1✔
275
      for (ClusterState state : clusterState.childClusterStates.values()) {
1✔
276
        if (ancestors.contains(state.name)) {
1✔
277
          namesCausingLoops.add(state.name);
1✔
278
        }
279
      }
1✔
280

281
      // Update parent map with entries from remaining children to clusterState
282
      clusterState.childClusterStates.values().stream()
1✔
283
          .filter(child -> !namesCausingLoops.contains(child.name))
1✔
284
          .forEach(
1✔
285
              child -> parentClusters.computeIfAbsent(child, k -> new ArrayList<>())
1✔
286
                  .add(clusterState));
1✔
287

288
      return namesCausingLoops;
1✔
289
    }
290

291
    /** Recursively add all parents to the ancestors list. **/
292
    private void addAncestors(Set<String> ancestors, ClusterState clusterState,
293
        Map<ClusterState, List<ClusterState>> parentClusters) {
294
      List<ClusterState> directParents = parentClusters.get(clusterState);
1✔
295
      if (directParents != null) {
1✔
296
        directParents.stream().map(c -> c.name).forEach(ancestors::add);
1✔
297
        directParents.forEach(p -> addAncestors(ancestors, p, parentClusters));
1✔
298
      }
299
    }
1✔
300

301
    private void handleClusterDiscoveryError(Status error) {
302
      if (childLb != null) {
1✔
303
        childLb.handleNameResolutionError(error);
1✔
304
      } else {
305
        helper.updateBalancingState(
1✔
306
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
307
      }
308
    }
1✔
309

310
    private final class ClusterState implements ResourceWatcher<CdsUpdate> {
311
      private final String name;
312
      @Nullable
313
      private Map<String, ClusterState> childClusterStates;
314
      @Nullable
315
      private CdsUpdate result;
316
      // Following fields are effectively final.
317
      private boolean isLeaf;
318
      private boolean discovered;
319
      private boolean shutdown;
320

321
      private ClusterState(String name) {
1✔
322
        this.name = name;
1✔
323
      }
1✔
324

325
      private void start() {
326
        shutdown = false;
1✔
327
        xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
1✔
328
      }
1✔
329

330
      void shutdown() {
331
        shutdown = true;
1✔
332
        xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this);
1✔
333
        if (childClusterStates != null) {
1✔
334
          // recursively shut down all descendants
335
          childClusterStates.values().stream()
1✔
336
              .filter(state -> !state.shutdown)
1✔
337
              .forEach(ClusterState::shutdown);
1✔
338
        }
339
      }
1✔
340

341
      @Override
342
      public void onError(Status error) {
343
        Status status = Status.UNAVAILABLE
1✔
344
            .withDescription(
1✔
345
                String.format("Unable to load CDS %s. xDS server returned: %s: %s",
1✔
346
                  name, error.getCode(), error.getDescription()))
1✔
347
            .withCause(error.getCause());
1✔
348
        if (shutdown) {
1✔
349
          return;
×
350
        }
351
        // All watchers should receive the same error, so we only propagate it once.
352
        if (ClusterState.this == root) {
1✔
353
          handleClusterDiscoveryError(status);
1✔
354
        }
355
      }
1✔
356

357
      @Override
358
      public void onResourceDoesNotExist(String resourceName) {
359
        if (shutdown) {
1✔
360
          return;
×
361
        }
362
        discovered = true;
1✔
363
        result = null;
1✔
364
        if (childClusterStates != null) {
1✔
365
          for (ClusterState state : childClusterStates.values()) {
1✔
366
            state.shutdown();
1✔
367
          }
1✔
368
          childClusterStates = null;
1✔
369
        }
370
        handleClusterDiscovered();
1✔
371
      }
1✔
372

373
      @Override
374
      public void onChanged(final CdsUpdate update) {
375
        if (shutdown) {
1✔
376
          return;
×
377
        }
378
        logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
1✔
379
        discovered = true;
1✔
380
        result = update;
1✔
381
        if (update.clusterType() == ClusterType.AGGREGATE) {
1✔
382
          isLeaf = false;
1✔
383
          logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
1✔
384
              update.clusterName(), update.prioritizedClusterNames());
1✔
385
          Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
1✔
386
          for (String cluster : update.prioritizedClusterNames()) {
1✔
387
            if (newChildStates.containsKey(cluster)) {
1✔
388
              logger.log(XdsLogLevel.WARNING,
1✔
389
                  String.format("duplicate cluster name %s in aggregate %s is being ignored",
1✔
390
                      cluster, update.clusterName()));
1✔
391
              continue;
1✔
392
            }
393
            if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
1✔
394
              ClusterState childState;
395
              if (clusterStates.containsKey(cluster)) {
1✔
396
                childState = clusterStates.get(cluster);
1✔
397
                if (childState.shutdown) {
1✔
398
                  childState.start();
×
399
                }
400
              } else {
401
                childState = new ClusterState(cluster);
1✔
402
                clusterStates.put(cluster, childState);
1✔
403
                childState.start();
1✔
404
              }
405
              newChildStates.put(cluster, childState);
1✔
406
            } else {
1✔
407
              newChildStates.put(cluster, childClusterStates.remove(cluster));
1✔
408
            }
409
          }
1✔
410
          if (childClusterStates != null) {  // stop subscribing to revoked child clusters
1✔
411
            for (ClusterState watcher : childClusterStates.values()) {
1✔
412
              watcher.shutdown();
1✔
413
            }
1✔
414
          }
415
          childClusterStates = newChildStates;
1✔
416
        } else if (update.clusterType() == ClusterType.EDS) {
1✔
417
          isLeaf = true;
1✔
418
          logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
1✔
419
              update.clusterName(), update.edsServiceName());
1✔
420
        } else {  // logical DNS
421
          isLeaf = true;
1✔
422
          logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
1✔
423
        }
424
        handleClusterDiscovered();
1✔
425
      }
1✔
426

427
    }
428
  }
429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc