• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19928

29 Jul 2025 12:36PM UTC coverage: 88.586% (-0.01%) from 88.599%
#19928

push

github

web-flow
Xds: Aggregate cluster fixes (A75) (#12186)

Instead of representing an aggregate cluster as a single cluster whose
priorities come from different underlying clusters, represent an aggregate cluster as an instance of a priority LB policy where each child is a cds LB policy for the underlying
cluster.

34654 of 39119 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.56
/../xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.collect.ImmutableMap;
25
import com.google.protobuf.Struct;
26
import io.grpc.Attributes;
27
import io.grpc.EquivalentAddressGroup;
28
import io.grpc.HttpConnectProxiedSocketAddress;
29
import io.grpc.InternalLogId;
30
import io.grpc.LoadBalancer;
31
import io.grpc.LoadBalancerProvider;
32
import io.grpc.LoadBalancerRegistry;
33
import io.grpc.NameResolver;
34
import io.grpc.NameResolver.ResolutionResult;
35
import io.grpc.Status;
36
import io.grpc.StatusOr;
37
import io.grpc.SynchronizationContext;
38
import io.grpc.SynchronizationContext.ScheduledHandle;
39
import io.grpc.internal.BackoffPolicy;
40
import io.grpc.internal.ExponentialBackoffPolicy;
41
import io.grpc.internal.ObjectPool;
42
import io.grpc.util.ForwardingLoadBalancerHelper;
43
import io.grpc.util.GracefulSwitchLoadBalancer;
44
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
45
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
46
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
47
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
48
import io.grpc.xds.Endpoints.DropOverload;
49
import io.grpc.xds.Endpoints.LbEndpoint;
50
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
51
import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection;
52
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
53
import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection;
54
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
55
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
56
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
57
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
58
import io.grpc.xds.client.Bootstrapper.ServerInfo;
59
import io.grpc.xds.client.Locality;
60
import io.grpc.xds.client.XdsClient;
61
import io.grpc.xds.client.XdsClient.ResourceWatcher;
62
import io.grpc.xds.client.XdsLogger;
63
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
64
import java.net.InetSocketAddress;
65
import java.net.SocketAddress;
66
import java.net.URI;
67
import java.net.URISyntaxException;
68
import java.util.ArrayList;
69
import java.util.Arrays;
70
import java.util.Collections;
71
import java.util.HashMap;
72
import java.util.HashSet;
73
import java.util.List;
74
import java.util.Locale;
75
import java.util.Map;
76
import java.util.Objects;
77
import java.util.Set;
78
import java.util.TreeMap;
79
import java.util.concurrent.ScheduledExecutorService;
80
import java.util.concurrent.TimeUnit;
81
import javax.annotation.Nullable;
82

83
/**
84
 * Load balancer for cluster_resolver_experimental LB policy. This LB policy is the child LB policy
85
 * of the cds_experimental LB policy and the parent LB policy of the priority_experimental LB
86
 * policy in the xDS load balancing hierarchy. This policy resolves endpoints of non-aggregate
87
 * clusters (e.g., EDS or Logical DNS) and groups endpoints in priorities and localities to be
88
 * used in the downstream LB policies for fine-grained load balancing purposes.
89
 */
90
final class ClusterResolverLoadBalancer extends LoadBalancer {
91
  // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
92
  // to an empty locality.
93
  private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
1✔
94
  private final XdsLogger logger;
95
  private final SynchronizationContext syncContext;
96
  private final ScheduledExecutorService timeService;
97
  private final LoadBalancerRegistry lbRegistry;
98
  private final BackoffPolicy.Provider backoffPolicyProvider;
99
  private final GracefulSwitchLoadBalancer delegate;
100
  private ObjectPool<XdsClient> xdsClientPool;
101
  private XdsClient xdsClient;
102
  private ClusterResolverConfig config;
103

104
  ClusterResolverLoadBalancer(Helper helper) {
105
    this(helper, LoadBalancerRegistry.getDefaultRegistry(),
1✔
106
        new ExponentialBackoffPolicy.Provider());
107
  }
1✔
108

109
  @VisibleForTesting
110
  ClusterResolverLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry,
111
      BackoffPolicy.Provider backoffPolicyProvider) {
1✔
112
    this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
1✔
113
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
114
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
115
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
116
    delegate = new GracefulSwitchLoadBalancer(helper);
1✔
117
    logger = XdsLogger.withLogId(
1✔
118
        InternalLogId.allocate("cluster-resolver-lb", helper.getAuthority()));
1✔
119
    logger.log(XdsLogLevel.INFO, "Created");
1✔
120
  }
1✔
121

122
  @Override
123
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
124
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
125
    if (xdsClientPool == null) {
1✔
126
      xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL);
1✔
127
      xdsClient = xdsClientPool.getObject();
1✔
128
    }
129
    ClusterResolverConfig config =
1✔
130
        (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
131
    if (!Objects.equals(this.config, config)) {
1✔
132
      logger.log(XdsLogLevel.DEBUG, "Config: {0}", config);
1✔
133
      this.config = config;
1✔
134
      Object gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
135
          new ClusterResolverLbStateFactory(), config);
136
      delegate.handleResolvedAddresses(
1✔
137
          resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build());
1✔
138
    }
139
    return Status.OK;
1✔
140
  }
141

142
  @Override
143
  public void handleNameResolutionError(Status error) {
144
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
145
    delegate.handleNameResolutionError(error);
1✔
146
  }
1✔
147

148
  @Override
149
  public void shutdown() {
150
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
151
    delegate.shutdown();
1✔
152
    if (xdsClientPool != null) {
1✔
153
      xdsClientPool.returnObject(xdsClient);
1✔
154
    }
155
  }
1✔
156

157
  private final class ClusterResolverLbStateFactory extends LoadBalancer.Factory {
1✔
158
    @Override
159
    public LoadBalancer newLoadBalancer(Helper helper) {
160
      return new ClusterResolverLbState(helper);
1✔
161
    }
162
  }
163

164
  /**
165
   * The state of a cluster_resolver LB working session. A new instance is created whenever
166
   * the cluster_resolver LB receives a new config. The old instance is replaced when the
167
   * new one is ready to handle new RPCs.
168
   */
169
  private final class ClusterResolverLbState extends LoadBalancer {
170
    private final Helper helper;
171
    private ClusterState clusterState;
172
    private String cluster;
173
    private Object endpointLbConfig;
174
    private ResolvedAddresses resolvedAddresses;
175
    private LoadBalancer childLb;
176

177
    ClusterResolverLbState(Helper helper) {
1✔
178
      this.helper = new RefreshableHelper(checkNotNull(helper, "helper"));
1✔
179
      logger.log(XdsLogLevel.DEBUG, "New ClusterResolverLbState");
1✔
180
    }
1✔
181

182
    @Override
183
    public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
184
      this.resolvedAddresses = resolvedAddresses;
1✔
185
      ClusterResolverConfig config =
1✔
186
          (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
187
      endpointLbConfig = config.lbConfig;
1✔
188
      DiscoveryMechanism instance = config.discoveryMechanism;
1✔
189
      cluster = instance.cluster;
1✔
190
      if (instance.type == DiscoveryMechanism.Type.EDS) {
1✔
191
        clusterState = new EdsClusterState(instance.cluster, instance.edsServiceName,
1✔
192
            instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
193
            instance.filterMetadata, instance.outlierDetection);
194
      } else {  // logical DNS
195
        clusterState = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
1✔
196
            instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
197
            instance.filterMetadata);
198
      }
199
      clusterState.start();
1✔
200
      return Status.OK;
1✔
201
    }
202

203
    @Override
204
    public void handleNameResolutionError(Status error) {
205
      if (childLb != null) {
1✔
206
        childLb.handleNameResolutionError(error);
1✔
207
      } else {
208
        helper.updateBalancingState(
1✔
209
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
210
      }
211
    }
1✔
212

213
    @Override
214
    public void shutdown() {
215
      clusterState.shutdown();
1✔
216
      if (childLb != null) {
1✔
217
        childLb.shutdown();
1✔
218
      }
219
    }
1✔
220

221
    private void handleEndpointResourceUpdate() {
222
      List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
223
      Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
1✔
224
      List<String> priorities = new ArrayList<>();  // totally ordered priority list
1✔
225

226
      Status endpointNotFound = Status.OK;
1✔
227
      // Propagate endpoints to the child LB policy only after all clusters have been resolved.
228
      if (!clusterState.resolved && clusterState.status.isOk()) {
1✔
229
        return;
×
230
      }
231
      if (clusterState.result != null) {
1✔
232
        addresses.addAll(clusterState.result.addresses);
1✔
233
        priorityChildConfigs.putAll(clusterState.result.priorityChildConfigs);
1✔
234
        priorities.addAll(clusterState.result.priorities);
1✔
235
      } else {
236
        endpointNotFound = clusterState.status;
1✔
237
      }
238
      if (addresses.isEmpty()) {
1✔
239
        if (endpointNotFound.isOk()) {
1✔
240
          endpointNotFound = Status.UNAVAILABLE.withDescription(
1✔
241
              "No usable endpoint from cluster: " + cluster);
242
        } else {
243
          endpointNotFound =
1✔
244
              Status.UNAVAILABLE.withCause(endpointNotFound.getCause())
1✔
245
                  .withDescription(endpointNotFound.getDescription());
1✔
246
        }
247
        helper.updateBalancingState(
1✔
248
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(endpointNotFound)));
1✔
249
        if (childLb != null) {
1✔
250
          childLb.shutdown();
1✔
251
          childLb = null;
1✔
252
        }
253
        return;
1✔
254
      }
255
      PriorityLbConfig childConfig =
1✔
256
          new PriorityLbConfig(Collections.unmodifiableMap(priorityChildConfigs),
1✔
257
              Collections.unmodifiableList(priorities));
1✔
258
      if (childLb == null) {
1✔
259
        childLb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(helper);
1✔
260
      }
261
      childLb.handleResolvedAddresses(
1✔
262
          resolvedAddresses.toBuilder()
1✔
263
              .setLoadBalancingPolicyConfig(childConfig)
1✔
264
              .setAddresses(Collections.unmodifiableList(addresses))
1✔
265
              .build());
1✔
266
    }
1✔
267

268
    private void handleEndpointResolutionError() {
269
      if (!clusterState.status.isOk()) {
1✔
270
        if (childLb != null) {
1✔
271
          childLb.handleNameResolutionError(clusterState.status);
1✔
272
        } else {
273
          helper.updateBalancingState(
1✔
274
              TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(clusterState.status)));
1✔
275
        }
276
      }
277
    }
1✔
278

279
    /**
280
     * Wires re-resolution requests from downstream LB policies with DNS resolver.
281
     */
282
    private final class RefreshableHelper extends ForwardingLoadBalancerHelper {
283
      private final Helper delegate;
284

285
      private RefreshableHelper(Helper delegate) {
1✔
286
        this.delegate = checkNotNull(delegate, "delegate");
1✔
287
      }
1✔
288

289
      @Override
290
      public void refreshNameResolution() {
291
        if (clusterState instanceof LogicalDnsClusterState) {
1✔
292
          ((LogicalDnsClusterState) clusterState).refresh();
1✔
293
        }
294
      }
1✔
295

296
      @Override
297
      protected Helper delegate() {
298
        return delegate;
1✔
299
      }
300
    }
301

302
    /**
303
     * Resolution state of an underlying cluster.
304
     */
305
    private abstract class ClusterState {
306
      // Name of the cluster to be resolved.
307
      protected final String name;
308
      @Nullable
309
      protected final ServerInfo lrsServerInfo;
310
      @Nullable
311
      protected final Long maxConcurrentRequests;
312
      @Nullable
313
      protected final UpstreamTlsContext tlsContext;
314
      protected final Map<String, Struct> filterMetadata;
315
      @Nullable
316
      protected final OutlierDetection outlierDetection;
317
      // Resolution status, may contain most recent error encountered.
318
      protected Status status = Status.OK;
1✔
319
      // True if has received resolution result.
320
      protected boolean resolved;
321
      // Most recently resolved addresses and config, or null if resource not exists.
322
      @Nullable
323
      protected ClusterResolutionResult result;
324

325
      protected boolean shutdown;
326

327
      private ClusterState(String name, @Nullable ServerInfo lrsServerInfo,
328
          @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
329
          Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection) {
1✔
330
        this.name = name;
1✔
331
        this.lrsServerInfo = lrsServerInfo;
1✔
332
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
333
        this.tlsContext = tlsContext;
1✔
334
        this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
1✔
335
        this.outlierDetection = outlierDetection;
1✔
336
      }
1✔
337

338
      abstract void start();
339

340
      void shutdown() {
341
        shutdown = true;
1✔
342
      }
1✔
343
    }
344

345
    private final class EdsClusterState extends ClusterState implements ResourceWatcher<EdsUpdate> {
346
      @Nullable
347
      private final String edsServiceName;
348
      private Map<Locality, String> localityPriorityNames = Collections.emptyMap();
1✔
349
      int priorityNameGenId = 1;
1✔
350

351
      private EdsClusterState(String name, @Nullable String edsServiceName,
352
          @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
353
          @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
354
          @Nullable OutlierDetection outlierDetection) {
1✔
355
        super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
1✔
356
            outlierDetection);
357
        this.edsServiceName = edsServiceName;
1✔
358
      }
1✔
359

360
      @Override
361
      void start() {
362
        String resourceName = edsServiceName != null ? edsServiceName : name;
1✔
363
        logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
1✔
364
        xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
1✔
365
            resourceName, this, syncContext);
1✔
366
      }
1✔
367

368
      @Override
369
      protected void shutdown() {
370
        super.shutdown();
1✔
371
        String resourceName = edsServiceName != null ? edsServiceName : name;
1✔
372
        logger.log(XdsLogLevel.INFO, "Stop watching EDS resource {0}", resourceName);
1✔
373
        xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), resourceName, this);
1✔
374
      }
1✔
375

376
      @Override
377
      public void onChanged(final EdsUpdate update) {
378
        class EndpointsUpdated implements Runnable {
1✔
379
          @Override
380
          public void run() {
381
            if (shutdown) {
1✔
382
              return;
×
383
            }
384
            logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update);
1✔
385
            if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
386
              logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories",
×
387
                  update.clusterName, update.localityLbEndpointsMap.size(),
×
388
                  update.dropPolicies.size());
×
389
            }
390
            Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
1✔
391
                update.localityLbEndpointsMap;
392
            List<DropOverload> dropOverloads = update.dropPolicies;
1✔
393
            List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
394
            Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
1✔
395
            List<String> sortedPriorityNames = generatePriorityNames(name, localityLbEndpoints);
1✔
396
            for (Locality locality : localityLbEndpoints.keySet()) {
1✔
397
              LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
1✔
398
              String priorityName = localityPriorityNames.get(locality);
1✔
399
              boolean discard = true;
1✔
400
              for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
1✔
401
                if (endpoint.isHealthy()) {
1✔
402
                  discard = false;
1✔
403
                  long weight = localityLbInfo.localityWeight();
1✔
404
                  if (endpoint.loadBalancingWeight() != 0) {
1✔
405
                    weight *= endpoint.loadBalancingWeight();
1✔
406
                  }
407
                  String localityName = localityName(locality);
1✔
408
                  Attributes attr =
1✔
409
                      endpoint.eag().getAttributes().toBuilder()
1✔
410
                          .set(XdsAttributes.ATTR_LOCALITY, locality)
1✔
411
                          .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
1✔
412
                          .set(XdsAttributes.ATTR_LOCALITY_WEIGHT,
1✔
413
                              localityLbInfo.localityWeight())
1✔
414
                          .set(XdsAttributes.ATTR_SERVER_WEIGHT, weight)
1✔
415
                          .set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname())
1✔
416
                          .build();
1✔
417

418
                  EquivalentAddressGroup eag;
419
                  if (config.isHttp11ProxyAvailable()) {
1✔
420
                    List<SocketAddress> rewrittenAddresses = new ArrayList<>();
1✔
421
                    for (SocketAddress addr : endpoint.eag().getAddresses()) {
1✔
422
                      rewrittenAddresses.add(rewriteAddress(
1✔
423
                          addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata()));
1✔
424
                    }
1✔
425
                    eag = new EquivalentAddressGroup(rewrittenAddresses, attr);
1✔
426
                  } else {
1✔
427
                    eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
1✔
428
                  }
429
                  eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
1✔
430
                  addresses.add(eag);
1✔
431
                }
432
              }
1✔
433
              if (discard) {
1✔
434
                logger.log(XdsLogLevel.INFO,
1✔
435
                    "Discard locality {0} with 0 healthy endpoints", locality);
436
                continue;
1✔
437
              }
438
              if (!prioritizedLocalityWeights.containsKey(priorityName)) {
1✔
439
                prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
1✔
440
              }
441
              prioritizedLocalityWeights.get(priorityName).put(
1✔
442
                  locality, localityLbInfo.localityWeight());
1✔
443
            }
1✔
444
            if (prioritizedLocalityWeights.isEmpty()) {
1✔
445
              // Will still update the result, as if the cluster resource is revoked.
446
              logger.log(XdsLogLevel.INFO,
1✔
447
                  "Cluster {0} has no usable priority/locality/endpoint", update.clusterName);
448
            }
449
            sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet());
1✔
450
            Map<String, PriorityChildConfig> priorityChildConfigs =
1✔
451
                generateEdsBasedPriorityChildConfigs(
1✔
452
                    name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext,
1✔
453
                    filterMetadata, outlierDetection, endpointLbConfig, lbRegistry,
1✔
454
                    prioritizedLocalityWeights, dropOverloads);
455
            status = Status.OK;
1✔
456
            resolved = true;
1✔
457
            result = new ClusterResolutionResult(addresses, priorityChildConfigs,
1✔
458
                sortedPriorityNames);
459
            handleEndpointResourceUpdate();
1✔
460
          }
1✔
461
        }
462

463
        new EndpointsUpdated().run();
1✔
464
      }
1✔
465

466
      private SocketAddress rewriteAddress(SocketAddress addr,
467
          ImmutableMap<String, Object> endpointMetadata,
468
          ImmutableMap<String, Object> localityMetadata) {
469
        if (!(addr instanceof InetSocketAddress)) {
1✔
470
          return addr;
×
471
        }
472

473
        SocketAddress proxyAddress;
474
        try {
475
          proxyAddress = (SocketAddress) endpointMetadata.get(
1✔
476
              "envoy.http11_proxy_transport_socket.proxy_address");
477
          if (proxyAddress == null) {
1✔
478
            proxyAddress = (SocketAddress) localityMetadata.get(
1✔
479
                "envoy.http11_proxy_transport_socket.proxy_address");
480
          }
481
        } catch (ClassCastException e) {
×
482
          return addr;
×
483
        }
1✔
484

485
        if (proxyAddress == null) {
1✔
486
          return addr;
×
487
        }
488

489
        return HttpConnectProxiedSocketAddress.newBuilder()
1✔
490
            .setTargetAddress((InetSocketAddress) addr)
1✔
491
            .setProxyAddress(proxyAddress)
1✔
492
            .build();
1✔
493
      }
494

495
      private List<String> generatePriorityNames(String name,
496
          Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
497
        TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
1✔
498
        for (Locality locality : localityLbEndpoints.keySet()) {
1✔
499
          int priority = localityLbEndpoints.get(locality).priority();
1✔
500
          if (!todo.containsKey(priority)) {
1✔
501
            todo.put(priority, new ArrayList<>());
1✔
502
          }
503
          todo.get(priority).add(locality);
1✔
504
        }
1✔
505
        Map<Locality, String> newNames = new HashMap<>();
1✔
506
        Set<String> usedNames = new HashSet<>();
1✔
507
        List<String> ret = new ArrayList<>();
1✔
508
        for (Integer priority: todo.keySet()) {
1✔
509
          String foundName = "";
1✔
510
          for (Locality locality : todo.get(priority)) {
1✔
511
            if (localityPriorityNames.containsKey(locality)
1✔
512
                && usedNames.add(localityPriorityNames.get(locality))) {
1✔
513
              foundName = localityPriorityNames.get(locality);
1✔
514
              break;
1✔
515
            }
516
          }
1✔
517
          if ("".equals(foundName)) {
1✔
518
            foundName = String.format(Locale.US, "%s[child%d]", name, priorityNameGenId++);
1✔
519
          }
520
          for (Locality locality : todo.get(priority)) {
1✔
521
            newNames.put(locality, foundName);
1✔
522
          }
1✔
523
          ret.add(foundName);
1✔
524
        }
1✔
525
        localityPriorityNames = newNames;
1✔
526
        return ret;
1✔
527
      }
528

529
      @Override
530
      public void onResourceDoesNotExist(final String resourceName) {
531
        if (shutdown) {
1✔
532
          return;
×
533
        }
534
        logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
1✔
535
        status = Status.OK;
1✔
536
        resolved = true;
1✔
537
        result = null;  // resource revoked
1✔
538
        handleEndpointResourceUpdate();
1✔
539
      }
1✔
540

541
      @Override
542
      public void onError(final Status error) {
543
        if (shutdown) {
1✔
544
          return;
×
545
        }
546
        String resourceName = edsServiceName != null ? edsServiceName : name;
1✔
547
        status = Status.UNAVAILABLE
1✔
548
            .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
1✔
549
                  resourceName, error.getCode(), error.getDescription()))
1✔
550
            .withCause(error.getCause());
1✔
551
        logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
1✔
552
        handleEndpointResolutionError();
1✔
553
      }
1✔
554
    }
555

556
    private final class LogicalDnsClusterState extends ClusterState {
557
      private final String dnsHostName;
558
      private final NameResolver.Factory nameResolverFactory;
559
      private final NameResolver.Args nameResolverArgs;
560
      private NameResolver resolver;
561
      @Nullable
562
      private BackoffPolicy backoffPolicy;
563
      @Nullable
564
      private ScheduledHandle scheduledRefresh;
565

566
      private LogicalDnsClusterState(String name, String dnsHostName,
567
          @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
568
          @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
1✔
569
        super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null);
1✔
570
        this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName");
1✔
571
        nameResolverFactory =
1✔
572
            checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory");
1✔
573
        nameResolverArgs = checkNotNull(helper.getNameResolverArgs(), "nameResolverArgs");
1✔
574
      }
1✔
575

576
      @Override
577
      void start() {
578
        URI uri;
579
        try {
580
          uri = new URI("dns", "", "/" + dnsHostName, null);
1✔
581
        } catch (URISyntaxException e) {
×
582
          status = Status.INTERNAL.withDescription(
×
583
              "Bug, invalid URI creation: " + dnsHostName).withCause(e);
×
584
          handleEndpointResolutionError();
×
585
          return;
×
586
        }
1✔
587
        resolver = nameResolverFactory.newNameResolver(uri, nameResolverArgs);
1✔
588
        if (resolver == null) {
1✔
589
          status = Status.INTERNAL.withDescription("Xds cluster resolver lb for logical DNS "
×
590
              + "cluster [" + name + "] cannot find DNS resolver with uri:" + uri);
591
          handleEndpointResolutionError();
×
592
          return;
×
593
        }
594
        resolver.start(new NameResolverListener(dnsHostName));
1✔
595
      }
1✔
596

597
      void refresh() {
598
        if (resolver == null) {
1✔
599
          return;
×
600
        }
601
        cancelBackoff();
1✔
602
        resolver.refresh();
1✔
603
      }
1✔
604

605
      @Override
606
      void shutdown() {
607
        super.shutdown();
1✔
608
        if (resolver != null) {
1✔
609
          resolver.shutdown();
1✔
610
        }
611
        cancelBackoff();
1✔
612
      }
1✔
613

614
      private void cancelBackoff() {
615
        if (scheduledRefresh != null) {
1✔
616
          scheduledRefresh.cancel();
1✔
617
          scheduledRefresh = null;
1✔
618
          backoffPolicy = null;
1✔
619
        }
620
      }
1✔
621

622
      private class DelayedNameResolverRefresh implements Runnable {
1✔
623
        @Override
624
        public void run() {
625
          scheduledRefresh = null;
1✔
626
          if (!shutdown) {
1✔
627
            resolver.refresh();
1✔
628
          }
629
        }
1✔
630
      }
631

632
      private class NameResolverListener extends NameResolver.Listener2 {
633
        private final String dnsHostName;
634

635
        NameResolverListener(String dnsHostName) {
1✔
636
          this.dnsHostName = dnsHostName;
1✔
637
        }
1✔
638

639
        @Override
640
        public void onResult(final ResolutionResult resolutionResult) {
641
          syncContext.execute(() -> onResult2(resolutionResult));
1✔
642
        }
1✔
643

644
        @Override
645
        public Status onResult2(final ResolutionResult resolutionResult) {
646
          if (shutdown) {
1✔
647
            return Status.OK;
×
648
          }
649
          // Arbitrary priority notation for all DNS-resolved endpoints.
650
          String priorityName = priorityName(name, 0);  // value doesn't matter
1✔
651
          List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
652
          StatusOr<List<EquivalentAddressGroup>> addressesOrError =
1✔
653
                  resolutionResult.getAddressesOrError();
1✔
654
          if (addressesOrError.hasValue()) {
1✔
655
            backoffPolicy = null;  // reset backoff sequence if succeeded
1✔
656
            for (EquivalentAddressGroup eag : addressesOrError.getValue()) {
1✔
657
              // No weight attribute is attached, all endpoint-level LB policy should be able
658
              // to handle such it.
659
              String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
1✔
660
              Attributes attr = eag.getAttributes().toBuilder()
1✔
661
                      .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
1✔
662
                      .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
1✔
663
                      .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
1✔
664
                      .build();
1✔
665
              eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
1✔
666
              eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
1✔
667
              addresses.add(eag);
1✔
668
            }
1✔
669
            PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
1✔
670
                    name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
671
                    lbRegistry, Collections.<DropOverload>emptyList());
1✔
672
            status = Status.OK;
1✔
673
            resolved = true;
1✔
674
            result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
1✔
675
            handleEndpointResourceUpdate();
1✔
676
            return Status.OK;
1✔
677
          } else {
678
            handleErrorInSyncContext(addressesOrError.getStatus());
1✔
679
            return addressesOrError.getStatus();
1✔
680
          }
681
        }
682

683
        @Override
684
        public void onError(final Status error) {
685
          syncContext.execute(() -> handleErrorInSyncContext(error));
1✔
686
        }
1✔
687

688
        private void handleErrorInSyncContext(final Status error) {
689
          if (shutdown) {
1✔
690
            return;
×
691
          }
692
          status = error;
1✔
693
          // NameResolver.Listener API cannot distinguish between address-not-found and
694
          // transient errors. If the error occurs in the first resolution, treat it as
695
          // address not found. Otherwise, either there is previously resolved addresses
696
          // previously encountered error, propagate the error to downstream/upstream and
697
          // let downstream/upstream handle it.
698
          if (!resolved) {
1✔
699
            resolved = true;
1✔
700
            handleEndpointResourceUpdate();
1✔
701
          } else {
702
            handleEndpointResolutionError();
1✔
703
          }
704
          if (scheduledRefresh != null && scheduledRefresh.isPending()) {
1✔
705
            return;
×
706
          }
707
          if (backoffPolicy == null) {
1✔
708
            backoffPolicy = backoffPolicyProvider.get();
1✔
709
          }
710
          long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
711
          logger.log(XdsLogLevel.DEBUG,
1✔
712
                  "Logical DNS resolver for cluster {0} encountered name resolution "
713
                          + "error: {1}, scheduling DNS resolution backoff for {2} ns",
714
                  name, error, delayNanos);
1✔
715
          scheduledRefresh =
1✔
716
                  syncContext.schedule(
1✔
717
                          new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
718
                          timeService);
1✔
719
        }
1✔
720
      }
721
    }
722
  }
723

724
  private static class ClusterResolutionResult {
725
    // Endpoint addresses.
726
    private final List<EquivalentAddressGroup> addresses;
727
    // Config (include load balancing policy/config) for each priority in the cluster.
728
    private final Map<String, PriorityChildConfig> priorityChildConfigs;
729
    // List of priority names ordered in descending priorities.
730
    private final List<String> priorities;
731

732
    ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority,
733
        PriorityChildConfig config) {
734
      this(addresses, Collections.singletonMap(priority, config),
1✔
735
          Collections.singletonList(priority));
1✔
736
    }
1✔
737

738
    ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
739
        Map<String, PriorityChildConfig> configs, List<String> priorities) {
1✔
740
      this.addresses = addresses;
1✔
741
      this.priorityChildConfigs = configs;
1✔
742
      this.priorities = priorities;
1✔
743
    }
1✔
744
  }
745

746
  /**
747
   * Generates the config to be used in the priority LB policy for the single priority of
748
   * logical DNS cluster.
749
   *
750
   * <p>priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first
751
   */
752
  private static PriorityChildConfig generateDnsBasedPriorityChildConfig(
753
      String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
754
      @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
755
      LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads) {
756
    // Override endpoint-level LB policy with pick_first for logical DNS cluster.
757
    Object endpointLbConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
758
        lbRegistry.getProvider("pick_first"), null);
1✔
759
    ClusterImplConfig clusterImplConfig =
1✔
760
        new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests,
761
            dropOverloads, endpointLbConfig, tlsContext, filterMetadata);
762
    LoadBalancerProvider clusterImplLbProvider =
1✔
763
        lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
1✔
764
    Object clusterImplPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
765
        clusterImplLbProvider, clusterImplConfig);
766
    return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/);
1✔
767
  }
768

769
  /**
770
   * Generates configs to be used in the priority LB policy for priorities in an EDS cluster.
771
   *
772
   * <p>priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB
773
   * -> round_robin / least_request_experimental (one per locality)) / ring_hash_experimental
774
   */
775
  private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildConfigs(
776
      String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo,
777
      @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
778
      Map<String, Struct> filterMetadata,
779
      @Nullable OutlierDetection outlierDetection, Object endpointLbConfig,
780
      LoadBalancerRegistry lbRegistry, Map<String,
781
      Map<Locality, Integer>> prioritizedLocalityWeights, List<DropOverload> dropOverloads) {
782
    Map<String, PriorityChildConfig> configs = new HashMap<>();
1✔
783
    for (String priority : prioritizedLocalityWeights.keySet()) {
1✔
784
      ClusterImplConfig clusterImplConfig =
1✔
785
          new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests,
786
              dropOverloads, endpointLbConfig, tlsContext, filterMetadata);
787
      LoadBalancerProvider clusterImplLbProvider =
1✔
788
          lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
1✔
789
      Object priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
790
          clusterImplLbProvider, clusterImplConfig);
791

792
      // If outlier detection has been configured we wrap the child policy in the outlier detection
793
      // load balancer.
794
      if (outlierDetection != null) {
1✔
795
        LoadBalancerProvider outlierDetectionProvider = lbRegistry.getProvider(
1✔
796
            "outlier_detection_experimental");
797
        priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
798
            outlierDetectionProvider,
799
            buildOutlierDetectionLbConfig(outlierDetection, priorityChildPolicy));
1✔
800
      }
801

802
      PriorityChildConfig priorityChildConfig =
1✔
803
          new PriorityChildConfig(priorityChildPolicy, true /* ignoreReresolution */);
804
      configs.put(priority, priorityChildConfig);
1✔
805
    }
1✔
806
    return configs;
1✔
807
  }
808

809
  /**
810
   * Converts {@link OutlierDetection} that represents the xDS configuration to {@link
811
   * OutlierDetectionLoadBalancerConfig} that the {@link io.grpc.util.OutlierDetectionLoadBalancer}
812
   * understands.
813
   */
814
  private static OutlierDetectionLoadBalancerConfig buildOutlierDetectionLbConfig(
815
      OutlierDetection outlierDetection, Object childConfig) {
816
    OutlierDetectionLoadBalancerConfig.Builder configBuilder
1✔
817
        = new OutlierDetectionLoadBalancerConfig.Builder();
818

819
    configBuilder.setChildConfig(childConfig);
1✔
820

821
    if (outlierDetection.intervalNanos() != null) {
1✔
822
      configBuilder.setIntervalNanos(outlierDetection.intervalNanos());
1✔
823
    }
824
    if (outlierDetection.baseEjectionTimeNanos() != null) {
1✔
825
      configBuilder.setBaseEjectionTimeNanos(outlierDetection.baseEjectionTimeNanos());
1✔
826
    }
827
    if (outlierDetection.maxEjectionTimeNanos() != null) {
1✔
828
      configBuilder.setMaxEjectionTimeNanos(outlierDetection.maxEjectionTimeNanos());
1✔
829
    }
830
    if (outlierDetection.maxEjectionPercent() != null) {
1✔
831
      configBuilder.setMaxEjectionPercent(outlierDetection.maxEjectionPercent());
1✔
832
    }
833

834
    SuccessRateEjection successRate = outlierDetection.successRateEjection();
1✔
835
    if (successRate != null) {
1✔
836
      OutlierDetectionLoadBalancerConfig.SuccessRateEjection.Builder
837
          successRateConfigBuilder = new OutlierDetectionLoadBalancerConfig
1✔
838
          .SuccessRateEjection.Builder();
839

840
      if (successRate.stdevFactor() != null) {
1✔
841
        successRateConfigBuilder.setStdevFactor(successRate.stdevFactor());
1✔
842
      }
843
      if (successRate.enforcementPercentage() != null) {
1✔
844
        successRateConfigBuilder.setEnforcementPercentage(successRate.enforcementPercentage());
1✔
845
      }
846
      if (successRate.minimumHosts() != null) {
1✔
847
        successRateConfigBuilder.setMinimumHosts(successRate.minimumHosts());
1✔
848
      }
849
      if (successRate.requestVolume() != null) {
1✔
850
        successRateConfigBuilder.setRequestVolume(successRate.requestVolume());
1✔
851
      }
852

853
      configBuilder.setSuccessRateEjection(successRateConfigBuilder.build());
1✔
854
    }
855

856
    FailurePercentageEjection failurePercentage = outlierDetection.failurePercentageEjection();
1✔
857
    if (failurePercentage != null) {
1✔
858
      OutlierDetectionLoadBalancerConfig.FailurePercentageEjection.Builder
859
          failurePercentageConfigBuilder = new OutlierDetectionLoadBalancerConfig
1✔
860
          .FailurePercentageEjection.Builder();
861

862
      if (failurePercentage.threshold() != null) {
1✔
863
        failurePercentageConfigBuilder.setThreshold(failurePercentage.threshold());
1✔
864
      }
865
      if (failurePercentage.enforcementPercentage() != null) {
1✔
866
        failurePercentageConfigBuilder.setEnforcementPercentage(
1✔
867
            failurePercentage.enforcementPercentage());
1✔
868
      }
869
      if (failurePercentage.minimumHosts() != null) {
1✔
870
        failurePercentageConfigBuilder.setMinimumHosts(failurePercentage.minimumHosts());
1✔
871
      }
872
      if (failurePercentage.requestVolume() != null) {
1✔
873
        failurePercentageConfigBuilder.setRequestVolume(failurePercentage.requestVolume());
1✔
874
      }
875

876
      configBuilder.setFailurePercentageEjection(failurePercentageConfigBuilder.build());
1✔
877
    }
878

879
    return configBuilder.build();
1✔
880
  }
881

882
  /**
883
   * Generates a string that represents the priority in the LB policy config. The string is unique
884
   * across priorities in all clusters and priorityName(c, p1) < priorityName(c, p2) iff p1 < p2.
885
   * The ordering is undefined for priorities in different clusters.
886
   */
887
  private static String priorityName(String cluster, int priority) {
888
    return cluster + "[child" + priority + "]";
1✔
889
  }
890

891
  /**
892
   * Generates a string that represents the locality in the LB policy config. The string is unique
893
   * across all localities in all clusters.
894
   */
895
  private static String localityName(Locality locality) {
896
    return "{region=\"" + locality.region()
1✔
897
        + "\", zone=\"" + locality.zone()
1✔
898
        + "\", sub_zone=\"" + locality.subZone()
1✔
899
        + "\"}";
900
  }
901
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc