• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19715

06 Mar 2025 08:10AM UTC coverage: 88.479% (-0.04%) from 88.515%
#19715

push

github

web-flow
xds: xDS-based HTTP CONNECT configuration (#11861)

34489 of 38980 relevant lines covered (88.48%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.98
/../xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.collect.ImmutableMap;
25
import com.google.protobuf.Struct;
26
import io.grpc.Attributes;
27
import io.grpc.EquivalentAddressGroup;
28
import io.grpc.HttpConnectProxiedSocketAddress;
29
import io.grpc.InternalLogId;
30
import io.grpc.LoadBalancer;
31
import io.grpc.LoadBalancerProvider;
32
import io.grpc.LoadBalancerRegistry;
33
import io.grpc.NameResolver;
34
import io.grpc.NameResolver.ResolutionResult;
35
import io.grpc.Status;
36
import io.grpc.SynchronizationContext;
37
import io.grpc.SynchronizationContext.ScheduledHandle;
38
import io.grpc.internal.BackoffPolicy;
39
import io.grpc.internal.ExponentialBackoffPolicy;
40
import io.grpc.internal.ObjectPool;
41
import io.grpc.util.ForwardingLoadBalancerHelper;
42
import io.grpc.util.GracefulSwitchLoadBalancer;
43
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
44
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
45
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
46
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
47
import io.grpc.xds.Endpoints.DropOverload;
48
import io.grpc.xds.Endpoints.LbEndpoint;
49
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
50
import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection;
51
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
52
import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection;
53
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
54
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
55
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
56
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
57
import io.grpc.xds.client.Bootstrapper.ServerInfo;
58
import io.grpc.xds.client.Locality;
59
import io.grpc.xds.client.XdsClient;
60
import io.grpc.xds.client.XdsClient.ResourceWatcher;
61
import io.grpc.xds.client.XdsLogger;
62
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
63
import java.net.InetSocketAddress;
64
import java.net.SocketAddress;
65
import java.net.URI;
66
import java.net.URISyntaxException;
67
import java.util.ArrayList;
68
import java.util.Arrays;
69
import java.util.Collections;
70
import java.util.HashMap;
71
import java.util.HashSet;
72
import java.util.List;
73
import java.util.Locale;
74
import java.util.Map;
75
import java.util.Objects;
76
import java.util.Set;
77
import java.util.TreeMap;
78
import java.util.concurrent.ScheduledExecutorService;
79
import java.util.concurrent.TimeUnit;
80
import javax.annotation.Nullable;
81

82
/**
83
 * Load balancer for cluster_resolver_experimental LB policy. This LB policy is the child LB policy
84
 * of the cds_experimental LB policy and the parent LB policy of the priority_experimental LB
85
 * policy in the xDS load balancing hierarchy. This policy resolves endpoints of non-aggregate
86
 * clusters (e.g., EDS or Logical DNS) and groups endpoints in priorities and localities to be
87
 * used in the downstream LB policies for fine-grained load balancing purposes.
88
 */
89
final class ClusterResolverLoadBalancer extends LoadBalancer {
90
  // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
91
  // to an empty locality.
92
  private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
1✔
93
  private final XdsLogger logger;
94
  private final SynchronizationContext syncContext;
95
  private final ScheduledExecutorService timeService;
96
  private final LoadBalancerRegistry lbRegistry;
97
  private final BackoffPolicy.Provider backoffPolicyProvider;
98
  private final GracefulSwitchLoadBalancer delegate;
99
  private ObjectPool<XdsClient> xdsClientPool;
100
  private XdsClient xdsClient;
101
  private ClusterResolverConfig config;
102

103
  ClusterResolverLoadBalancer(Helper helper) {
104
    this(helper, LoadBalancerRegistry.getDefaultRegistry(),
1✔
105
        new ExponentialBackoffPolicy.Provider());
106
  }
1✔
107

108
  @VisibleForTesting
109
  ClusterResolverLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry,
110
      BackoffPolicy.Provider backoffPolicyProvider) {
1✔
111
    this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
1✔
112
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
113
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
114
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
115
    delegate = new GracefulSwitchLoadBalancer(helper);
1✔
116
    logger = XdsLogger.withLogId(
1✔
117
        InternalLogId.allocate("cluster-resolver-lb", helper.getAuthority()));
1✔
118
    logger.log(XdsLogLevel.INFO, "Created");
1✔
119
  }
1✔
120

121
  @Override
122
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
123
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
124
    if (xdsClientPool == null) {
1✔
125
      xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL);
1✔
126
      xdsClient = xdsClientPool.getObject();
1✔
127
    }
128
    ClusterResolverConfig config =
1✔
129
        (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
130
    if (!Objects.equals(this.config, config)) {
1✔
131
      logger.log(XdsLogLevel.DEBUG, "Config: {0}", config);
1✔
132
      this.config = config;
1✔
133
      Object gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
134
          new ClusterResolverLbStateFactory(), config);
135
      delegate.handleResolvedAddresses(
1✔
136
          resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build());
1✔
137
    }
138
    return Status.OK;
1✔
139
  }
140

141
  @Override
142
  public void handleNameResolutionError(Status error) {
143
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
144
    delegate.handleNameResolutionError(error);
1✔
145
  }
1✔
146

147
  @Override
148
  public void shutdown() {
149
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
150
    delegate.shutdown();
1✔
151
    if (xdsClientPool != null) {
1✔
152
      xdsClientPool.returnObject(xdsClient);
1✔
153
    }
154
  }
1✔
155

156
  private final class ClusterResolverLbStateFactory extends LoadBalancer.Factory {
1✔
157
    @Override
158
    public LoadBalancer newLoadBalancer(Helper helper) {
159
      return new ClusterResolverLbState(helper);
1✔
160
    }
161
  }
162

163
  /**
164
   * The state of a cluster_resolver LB working session. A new instance is created whenever
165
   * the cluster_resolver LB receives a new config. The old instance is replaced when the
166
   * new one is ready to handle new RPCs.
167
   */
168
  private final class ClusterResolverLbState extends LoadBalancer {
169
    private final Helper helper;
170
    private final List<String> clusters = new ArrayList<>();
1✔
171
    private final Map<String, ClusterState> clusterStates = new HashMap<>();
1✔
172
    private Object endpointLbConfig;
173
    private ResolvedAddresses resolvedAddresses;
174
    private LoadBalancer childLb;
175

176
    ClusterResolverLbState(Helper helper) {
1✔
177
      this.helper = new RefreshableHelper(checkNotNull(helper, "helper"));
1✔
178
      logger.log(XdsLogLevel.DEBUG, "New ClusterResolverLbState");
1✔
179
    }
1✔
180

181
    @Override
182
    public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
183
      this.resolvedAddresses = resolvedAddresses;
1✔
184
      ClusterResolverConfig config =
1✔
185
          (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
186
      endpointLbConfig = config.lbConfig;
1✔
187
      for (DiscoveryMechanism instance : config.discoveryMechanisms) {
1✔
188
        clusters.add(instance.cluster);
1✔
189
        ClusterState state;
190
        if (instance.type == DiscoveryMechanism.Type.EDS) {
1✔
191
          state = new EdsClusterState(instance.cluster, instance.edsServiceName,
1✔
192
              instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
193
              instance.filterMetadata, instance.outlierDetection);
194
        } else {  // logical DNS
195
          state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
1✔
196
              instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
197
              instance.filterMetadata);
198
        }
199
        clusterStates.put(instance.cluster, state);
1✔
200
        state.start();
1✔
201
      }
1✔
202
      return Status.OK;
1✔
203
    }
204

205
    @Override
206
    public void handleNameResolutionError(Status error) {
207
      if (childLb != null) {
1✔
208
        childLb.handleNameResolutionError(error);
1✔
209
      } else {
210
        helper.updateBalancingState(
1✔
211
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
212
      }
213
    }
1✔
214

215
    @Override
216
    public void shutdown() {
217
      for (ClusterState state : clusterStates.values()) {
1✔
218
        state.shutdown();
1✔
219
      }
1✔
220
      if (childLb != null) {
1✔
221
        childLb.shutdown();
1✔
222
      }
223
    }
1✔
224

225
    private void handleEndpointResourceUpdate() {
226
      List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
227
      Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
1✔
228
      List<String> priorities = new ArrayList<>();  // totally ordered priority list
1✔
229

230
      Status endpointNotFound = Status.OK;
1✔
231
      for (String cluster : clusters) {
1✔
232
        ClusterState state = clusterStates.get(cluster);
1✔
233
        // Propagate endpoints to the child LB policy only after all clusters have been resolved.
234
        if (!state.resolved && state.status.isOk()) {
1✔
235
          return;
1✔
236
        }
237
        if (state.result != null) {
1✔
238
          addresses.addAll(state.result.addresses);
1✔
239
          priorityChildConfigs.putAll(state.result.priorityChildConfigs);
1✔
240
          priorities.addAll(state.result.priorities);
1✔
241
        } else {
242
          endpointNotFound = state.status;
1✔
243
        }
244
      }
1✔
245
      if (addresses.isEmpty()) {
1✔
246
        if (endpointNotFound.isOk()) {
1✔
247
          endpointNotFound = Status.UNAVAILABLE.withDescription(
1✔
248
              "No usable endpoint from cluster(s): " + clusters);
249
        } else {
250
          endpointNotFound =
1✔
251
              Status.UNAVAILABLE.withCause(endpointNotFound.getCause())
1✔
252
                  .withDescription(endpointNotFound.getDescription());
1✔
253
        }
254
        helper.updateBalancingState(
1✔
255
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(endpointNotFound)));
1✔
256
        if (childLb != null) {
1✔
257
          childLb.shutdown();
1✔
258
          childLb = null;
1✔
259
        }
260
        return;
1✔
261
      }
262
      PriorityLbConfig childConfig =
1✔
263
          new PriorityLbConfig(Collections.unmodifiableMap(priorityChildConfigs),
1✔
264
              Collections.unmodifiableList(priorities));
1✔
265
      if (childLb == null) {
1✔
266
        childLb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(helper);
1✔
267
      }
268
      childLb.handleResolvedAddresses(
1✔
269
          resolvedAddresses.toBuilder()
1✔
270
              .setLoadBalancingPolicyConfig(childConfig)
1✔
271
              .setAddresses(Collections.unmodifiableList(addresses))
1✔
272
              .build());
1✔
273
    }
1✔
274

275
    private void handleEndpointResolutionError() {
276
      boolean allInError = true;
1✔
277
      Status error = null;
1✔
278
      for (String cluster : clusters) {
1✔
279
        ClusterState state = clusterStates.get(cluster);
1✔
280
        if (state.status.isOk()) {
1✔
281
          allInError = false;
1✔
282
        } else {
283
          error = state.status;
1✔
284
        }
285
      }
1✔
286
      if (allInError) {
1✔
287
        if (childLb != null) {
1✔
288
          childLb.handleNameResolutionError(error);
1✔
289
        } else {
290
          helper.updateBalancingState(
1✔
291
              TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
292
        }
293
      }
294
    }
1✔
295

296
    /**
297
     * Wires re-resolution requests from downstream LB policies with DNS resolver.
298
     */
299
    private final class RefreshableHelper extends ForwardingLoadBalancerHelper {
300
      private final Helper delegate;
301

302
      private RefreshableHelper(Helper delegate) {
1✔
303
        this.delegate = checkNotNull(delegate, "delegate");
1✔
304
      }
1✔
305

306
      @Override
307
      public void refreshNameResolution() {
308
        for (ClusterState state : clusterStates.values()) {
1✔
309
          if (state instanceof LogicalDnsClusterState) {
1✔
310
            ((LogicalDnsClusterState) state).refresh();
1✔
311
          }
312
        }
1✔
313
      }
1✔
314

315
      @Override
316
      protected Helper delegate() {
317
        return delegate;
1✔
318
      }
319
    }
320

321
    /**
322
     * Resolution state of an underlying cluster.
323
     */
324
    private abstract class ClusterState {
325
      // Name of the cluster to be resolved.
326
      protected final String name;
327
      @Nullable
328
      protected final ServerInfo lrsServerInfo;
329
      @Nullable
330
      protected final Long maxConcurrentRequests;
331
      @Nullable
332
      protected final UpstreamTlsContext tlsContext;
333
      protected final Map<String, Struct> filterMetadata;
334
      @Nullable
335
      protected final OutlierDetection outlierDetection;
336
      // Resolution status, may contain most recent error encountered.
337
      protected Status status = Status.OK;
1✔
338
      // True if has received resolution result.
339
      protected boolean resolved;
340
      // Most recently resolved addresses and config, or null if resource not exists.
341
      @Nullable
342
      protected ClusterResolutionResult result;
343

344
      protected boolean shutdown;
345

346
      private ClusterState(String name, @Nullable ServerInfo lrsServerInfo,
347
          @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
348
          Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection) {
1✔
349
        this.name = name;
1✔
350
        this.lrsServerInfo = lrsServerInfo;
1✔
351
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
352
        this.tlsContext = tlsContext;
1✔
353
        this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
1✔
354
        this.outlierDetection = outlierDetection;
1✔
355
      }
1✔
356

357
      abstract void start();
358

359
      void shutdown() {
360
        shutdown = true;
1✔
361
      }
1✔
362
    }
363

364
    private final class EdsClusterState extends ClusterState implements ResourceWatcher<EdsUpdate> {
365
      @Nullable
366
      private final String edsServiceName;
367
      private Map<Locality, String> localityPriorityNames = Collections.emptyMap();
1✔
368
      int priorityNameGenId = 1;
1✔
369

370
      private EdsClusterState(String name, @Nullable String edsServiceName,
371
          @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
372
          @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
373
          @Nullable OutlierDetection outlierDetection) {
1✔
374
        super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
1✔
375
            outlierDetection);
376
        this.edsServiceName = edsServiceName;
1✔
377
      }
1✔
378

379
      @Override
380
      void start() {
381
        String resourceName = edsServiceName != null ? edsServiceName : name;
1✔
382
        logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
1✔
383
        xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
1✔
384
            resourceName, this, syncContext);
1✔
385
      }
1✔
386

387
      @Override
388
      protected void shutdown() {
389
        super.shutdown();
1✔
390
        String resourceName = edsServiceName != null ? edsServiceName : name;
1✔
391
        logger.log(XdsLogLevel.INFO, "Stop watching EDS resource {0}", resourceName);
1✔
392
        xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), resourceName, this);
1✔
393
      }
1✔
394

395
      @Override
396
      public void onChanged(final EdsUpdate update) {
397
        class EndpointsUpdated implements Runnable {
1✔
398
          @Override
399
          public void run() {
400
            if (shutdown) {
1✔
401
              return;
×
402
            }
403
            logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update);
1✔
404
            if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
405
              logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories",
×
406
                  update.clusterName, update.localityLbEndpointsMap.size(),
×
407
                  update.dropPolicies.size());
×
408
            }
409
            Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
1✔
410
                update.localityLbEndpointsMap;
411
            List<DropOverload> dropOverloads = update.dropPolicies;
1✔
412
            List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
413
            Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
1✔
414
            List<String> sortedPriorityNames = generatePriorityNames(name, localityLbEndpoints);
1✔
415
            for (Locality locality : localityLbEndpoints.keySet()) {
1✔
416
              LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
1✔
417
              String priorityName = localityPriorityNames.get(locality);
1✔
418
              boolean discard = true;
1✔
419
              for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
1✔
420
                if (endpoint.isHealthy()) {
1✔
421
                  discard = false;
1✔
422
                  long weight = localityLbInfo.localityWeight();
1✔
423
                  if (endpoint.loadBalancingWeight() != 0) {
1✔
424
                    weight *= endpoint.loadBalancingWeight();
1✔
425
                  }
426
                  String localityName = localityName(locality);
1✔
427
                  Attributes attr =
1✔
428
                      endpoint.eag().getAttributes().toBuilder()
1✔
429
                          .set(XdsAttributes.ATTR_LOCALITY, locality)
1✔
430
                          .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
1✔
431
                          .set(XdsAttributes.ATTR_LOCALITY_WEIGHT,
1✔
432
                              localityLbInfo.localityWeight())
1✔
433
                          .set(XdsAttributes.ATTR_SERVER_WEIGHT, weight)
1✔
434
                          .set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname())
1✔
435
                          .build();
1✔
436

437
                  EquivalentAddressGroup eag;
438
                  if (config.isHttp11ProxyAvailable()) {
1✔
439
                    List<SocketAddress> rewrittenAddresses = new ArrayList<>();
1✔
440
                    for (SocketAddress addr : endpoint.eag().getAddresses()) {
1✔
441
                      rewrittenAddresses.add(rewriteAddress(
1✔
442
                          addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata()));
1✔
443
                    }
1✔
444
                    eag = new EquivalentAddressGroup(rewrittenAddresses, attr);
1✔
445
                  } else {
1✔
446
                    eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
1✔
447
                  }
448
                  eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
1✔
449
                  addresses.add(eag);
1✔
450
                }
451
              }
1✔
452
              if (discard) {
1✔
453
                logger.log(XdsLogLevel.INFO,
1✔
454
                    "Discard locality {0} with 0 healthy endpoints", locality);
455
                continue;
1✔
456
              }
457
              if (!prioritizedLocalityWeights.containsKey(priorityName)) {
1✔
458
                prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
1✔
459
              }
460
              prioritizedLocalityWeights.get(priorityName).put(
1✔
461
                  locality, localityLbInfo.localityWeight());
1✔
462
            }
1✔
463
            if (prioritizedLocalityWeights.isEmpty()) {
1✔
464
              // Will still update the result, as if the cluster resource is revoked.
465
              logger.log(XdsLogLevel.INFO,
1✔
466
                  "Cluster {0} has no usable priority/locality/endpoint", update.clusterName);
467
            }
468
            sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet());
1✔
469
            Map<String, PriorityChildConfig> priorityChildConfigs =
1✔
470
                generateEdsBasedPriorityChildConfigs(
1✔
471
                    name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext,
1✔
472
                    filterMetadata, outlierDetection, endpointLbConfig, lbRegistry,
1✔
473
                    prioritizedLocalityWeights, dropOverloads);
474
            status = Status.OK;
1✔
475
            resolved = true;
1✔
476
            result = new ClusterResolutionResult(addresses, priorityChildConfigs,
1✔
477
                sortedPriorityNames);
478
            handleEndpointResourceUpdate();
1✔
479
          }
1✔
480
        }
481

482
        new EndpointsUpdated().run();
1✔
483
      }
1✔
484

485
      private SocketAddress rewriteAddress(SocketAddress addr,
486
          ImmutableMap<String, Object> endpointMetadata,
487
          ImmutableMap<String, Object> localityMetadata) {
488
        if (!(addr instanceof InetSocketAddress)) {
1✔
489
          return addr;
×
490
        }
491

492
        SocketAddress proxyAddress;
493
        try {
494
          proxyAddress = (SocketAddress) endpointMetadata.get(
1✔
495
              "envoy.http11_proxy_transport_socket.proxy_address");
496
          if (proxyAddress == null) {
1✔
497
            proxyAddress = (SocketAddress) localityMetadata.get(
1✔
498
                "envoy.http11_proxy_transport_socket.proxy_address");
499
          }
500
        } catch (ClassCastException e) {
×
501
          return addr;
×
502
        }
1✔
503

504
        if (proxyAddress == null) {
1✔
505
          return addr;
×
506
        }
507

508
        return HttpConnectProxiedSocketAddress.newBuilder()
1✔
509
            .setTargetAddress((InetSocketAddress) addr)
1✔
510
            .setProxyAddress(proxyAddress)
1✔
511
            .build();
1✔
512
      }
513

514
      private List<String> generatePriorityNames(String name,
515
          Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
516
        TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
1✔
517
        for (Locality locality : localityLbEndpoints.keySet()) {
1✔
518
          int priority = localityLbEndpoints.get(locality).priority();
1✔
519
          if (!todo.containsKey(priority)) {
1✔
520
            todo.put(priority, new ArrayList<>());
1✔
521
          }
522
          todo.get(priority).add(locality);
1✔
523
        }
1✔
524
        Map<Locality, String> newNames = new HashMap<>();
1✔
525
        Set<String> usedNames = new HashSet<>();
1✔
526
        List<String> ret = new ArrayList<>();
1✔
527
        for (Integer priority: todo.keySet()) {
1✔
528
          String foundName = "";
1✔
529
          for (Locality locality : todo.get(priority)) {
1✔
530
            if (localityPriorityNames.containsKey(locality)
1✔
531
                && usedNames.add(localityPriorityNames.get(locality))) {
1✔
532
              foundName = localityPriorityNames.get(locality);
1✔
533
              break;
1✔
534
            }
535
          }
1✔
536
          if ("".equals(foundName)) {
1✔
537
            foundName = String.format(Locale.US, "%s[child%d]", name, priorityNameGenId++);
1✔
538
          }
539
          for (Locality locality : todo.get(priority)) {
1✔
540
            newNames.put(locality, foundName);
1✔
541
          }
1✔
542
          ret.add(foundName);
1✔
543
        }
1✔
544
        localityPriorityNames = newNames;
1✔
545
        return ret;
1✔
546
      }
547

548
      @Override
549
      public void onResourceDoesNotExist(final String resourceName) {
550
        if (shutdown) {
1✔
551
          return;
×
552
        }
553
        logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
1✔
554
        status = Status.OK;
1✔
555
        resolved = true;
1✔
556
        result = null;  // resource revoked
1✔
557
        handleEndpointResourceUpdate();
1✔
558
      }
1✔
559

560
      @Override
561
      public void onError(final Status error) {
562
        if (shutdown) {
1✔
563
          return;
×
564
        }
565
        String resourceName = edsServiceName != null ? edsServiceName : name;
1✔
566
        status = Status.UNAVAILABLE
1✔
567
            .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
1✔
568
                  resourceName, error.getCode(), error.getDescription()))
1✔
569
            .withCause(error.getCause());
1✔
570
        logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
1✔
571
        handleEndpointResolutionError();
1✔
572
      }
1✔
573
    }
574

575
    private final class LogicalDnsClusterState extends ClusterState {
576
      private final String dnsHostName;
577
      private final NameResolver.Factory nameResolverFactory;
578
      private final NameResolver.Args nameResolverArgs;
579
      private NameResolver resolver;
580
      @Nullable
581
      private BackoffPolicy backoffPolicy;
582
      @Nullable
583
      private ScheduledHandle scheduledRefresh;
584

585
      private LogicalDnsClusterState(String name, String dnsHostName,
586
          @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
587
          @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
1✔
588
        super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null);
1✔
589
        this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName");
1✔
590
        nameResolverFactory =
1✔
591
            checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory");
1✔
592
        nameResolverArgs = checkNotNull(helper.getNameResolverArgs(), "nameResolverArgs");
1✔
593
      }
1✔
594

595
      @Override
596
      void start() {
597
        URI uri;
598
        try {
599
          uri = new URI("dns", "", "/" + dnsHostName, null);
1✔
600
        } catch (URISyntaxException e) {
×
601
          status = Status.INTERNAL.withDescription(
×
602
              "Bug, invalid URI creation: " + dnsHostName).withCause(e);
×
603
          handleEndpointResolutionError();
×
604
          return;
×
605
        }
1✔
606
        resolver = nameResolverFactory.newNameResolver(uri, nameResolverArgs);
1✔
607
        if (resolver == null) {
1✔
608
          status = Status.INTERNAL.withDescription("Xds cluster resolver lb for logical DNS "
×
609
              + "cluster [" + name + "] cannot find DNS resolver with uri:" + uri);
610
          handleEndpointResolutionError();
×
611
          return;
×
612
        }
613
        resolver.start(new NameResolverListener(dnsHostName));
1✔
614
      }
1✔
615

616
      void refresh() {
617
        if (resolver == null) {
1✔
618
          return;
×
619
        }
620
        cancelBackoff();
1✔
621
        resolver.refresh();
1✔
622
      }
1✔
623

624
      @Override
625
      void shutdown() {
626
        super.shutdown();
1✔
627
        if (resolver != null) {
1✔
628
          resolver.shutdown();
1✔
629
        }
630
        cancelBackoff();
1✔
631
      }
1✔
632

633
      private void cancelBackoff() {
634
        if (scheduledRefresh != null) {
1✔
635
          scheduledRefresh.cancel();
1✔
636
          scheduledRefresh = null;
1✔
637
          backoffPolicy = null;
1✔
638
        }
639
      }
1✔
640

641
      private class DelayedNameResolverRefresh implements Runnable {
1✔
642
        @Override
643
        public void run() {
644
          scheduledRefresh = null;
1✔
645
          if (!shutdown) {
1✔
646
            resolver.refresh();
1✔
647
          }
648
        }
1✔
649
      }
650

651
      private class NameResolverListener extends NameResolver.Listener2 {
652
        private final String dnsHostName;
653

654
        NameResolverListener(String dnsHostName) {
1✔
655
          this.dnsHostName = dnsHostName;
1✔
656
        }
1✔
657

658
        @Override
659
        public void onResult(final ResolutionResult resolutionResult) {
660
          class NameResolved implements Runnable {
1✔
661
            @Override
662
            public void run() {
663
              if (shutdown) {
1✔
664
                return;
×
665
              }
666
              backoffPolicy = null;  // reset backoff sequence if succeeded
1✔
667
              // Arbitrary priority notation for all DNS-resolved endpoints.
668
              String priorityName = priorityName(name, 0);  // value doesn't matter
1✔
669
              List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
670
              for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
1✔
671
                // No weight attribute is attached, all endpoint-level LB policy should be able
672
                // to handle such it.
673
                String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
1✔
674
                Attributes attr = eag.getAttributes().toBuilder()
1✔
675
                    .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
1✔
676
                    .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
1✔
677
                    .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
1✔
678
                    .build();
1✔
679
                eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
1✔
680
                eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
1✔
681
                addresses.add(eag);
1✔
682
              }
1✔
683
              PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
1✔
684
                  name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
685
                  lbRegistry, Collections.<DropOverload>emptyList());
1✔
686
              status = Status.OK;
1✔
687
              resolved = true;
1✔
688
              result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
1✔
689
              handleEndpointResourceUpdate();
1✔
690
            }
1✔
691
          }
692

693
          syncContext.execute(new NameResolved());
1✔
694
        }
1✔
695

696
        @Override
697
        public void onError(final Status error) {
698
          syncContext.execute(new Runnable() {
1✔
699
            @Override
700
            public void run() {
701
              if (shutdown) {
1✔
702
                return;
×
703
              }
704
              status = error;
1✔
705
              // NameResolver.Listener API cannot distinguish between address-not-found and
706
              // transient errors. If the error occurs in the first resolution, treat it as
707
              // address not found. Otherwise, either there is previously resolved addresses
708
              // previously encountered error, propagate the error to downstream/upstream and
709
              // let downstream/upstream handle it.
710
              if (!resolved) {
1✔
711
                resolved = true;
1✔
712
                handleEndpointResourceUpdate();
1✔
713
              } else {
714
                handleEndpointResolutionError();
1✔
715
              }
716
              if (scheduledRefresh != null && scheduledRefresh.isPending()) {
1✔
717
                return;
×
718
              }
719
              if (backoffPolicy == null) {
1✔
720
                backoffPolicy = backoffPolicyProvider.get();
1✔
721
              }
722
              long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
723
              logger.log(XdsLogLevel.DEBUG,
1✔
724
                  "Logical DNS resolver for cluster {0} encountered name resolution "
725
                      + "error: {1}, scheduling DNS resolution backoff for {2} ns",
726
                  name, error, delayNanos);
1✔
727
              scheduledRefresh =
1✔
728
                  syncContext.schedule(
1✔
729
                      new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
730
                      timeService);
1✔
731
            }
1✔
732
          });
733
        }
1✔
734
      }
735
    }
736
  }
737

738
  private static class ClusterResolutionResult {
739
    // Endpoint addresses.
740
    private final List<EquivalentAddressGroup> addresses;
741
    // Config (include load balancing policy/config) for each priority in the cluster.
742
    private final Map<String, PriorityChildConfig> priorityChildConfigs;
743
    // List of priority names ordered in descending priorities.
744
    private final List<String> priorities;
745

746
    ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority,
747
        PriorityChildConfig config) {
748
      this(addresses, Collections.singletonMap(priority, config),
1✔
749
          Collections.singletonList(priority));
1✔
750
    }
1✔
751

752
    ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
753
        Map<String, PriorityChildConfig> configs, List<String> priorities) {
1✔
754
      this.addresses = addresses;
1✔
755
      this.priorityChildConfigs = configs;
1✔
756
      this.priorities = priorities;
1✔
757
    }
1✔
758
  }
759

760
  /**
761
   * Generates the config to be used in the priority LB policy for the single priority of
762
   * logical DNS cluster.
763
   *
764
   * <p>priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first
765
   */
766
  private static PriorityChildConfig generateDnsBasedPriorityChildConfig(
767
      String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
768
      @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
769
      LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads) {
770
    // Override endpoint-level LB policy with pick_first for logical DNS cluster.
771
    Object endpointLbConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
772
        lbRegistry.getProvider("pick_first"), null);
1✔
773
    ClusterImplConfig clusterImplConfig =
1✔
774
        new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests,
775
            dropOverloads, endpointLbConfig, tlsContext, filterMetadata);
776
    LoadBalancerProvider clusterImplLbProvider =
1✔
777
        lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
1✔
778
    Object clusterImplPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
779
        clusterImplLbProvider, clusterImplConfig);
780
    return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/);
1✔
781
  }
782

783
  /**
784
   * Generates configs to be used in the priority LB policy for priorities in an EDS cluster.
785
   *
786
   * <p>priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB
787
   * -> round_robin / least_request_experimental (one per locality)) / ring_hash_experimental
788
   */
789
  private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildConfigs(
790
      String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo,
791
      @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
792
      Map<String, Struct> filterMetadata,
793
      @Nullable OutlierDetection outlierDetection, Object endpointLbConfig,
794
      LoadBalancerRegistry lbRegistry, Map<String,
795
      Map<Locality, Integer>> prioritizedLocalityWeights, List<DropOverload> dropOverloads) {
796
    Map<String, PriorityChildConfig> configs = new HashMap<>();
1✔
797
    for (String priority : prioritizedLocalityWeights.keySet()) {
1✔
798
      ClusterImplConfig clusterImplConfig =
1✔
799
          new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests,
800
              dropOverloads, endpointLbConfig, tlsContext, filterMetadata);
801
      LoadBalancerProvider clusterImplLbProvider =
1✔
802
          lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
1✔
803
      Object priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
804
          clusterImplLbProvider, clusterImplConfig);
805

806
      // If outlier detection has been configured we wrap the child policy in the outlier detection
807
      // load balancer.
808
      if (outlierDetection != null) {
1✔
809
        LoadBalancerProvider outlierDetectionProvider = lbRegistry.getProvider(
1✔
810
            "outlier_detection_experimental");
811
        priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
1✔
812
            outlierDetectionProvider,
813
            buildOutlierDetectionLbConfig(outlierDetection, priorityChildPolicy));
1✔
814
      }
815

816
      PriorityChildConfig priorityChildConfig =
1✔
817
          new PriorityChildConfig(priorityChildPolicy, true /* ignoreReresolution */);
818
      configs.put(priority, priorityChildConfig);
1✔
819
    }
1✔
820
    return configs;
1✔
821
  }
822

823
  /**
824
   * Converts {@link OutlierDetection} that represents the xDS configuration to {@link
825
   * OutlierDetectionLoadBalancerConfig} that the {@link io.grpc.util.OutlierDetectionLoadBalancer}
826
   * understands.
827
   */
828
  private static OutlierDetectionLoadBalancerConfig buildOutlierDetectionLbConfig(
829
      OutlierDetection outlierDetection, Object childConfig) {
830
    OutlierDetectionLoadBalancerConfig.Builder configBuilder
1✔
831
        = new OutlierDetectionLoadBalancerConfig.Builder();
832

833
    configBuilder.setChildConfig(childConfig);
1✔
834

835
    if (outlierDetection.intervalNanos() != null) {
1✔
836
      configBuilder.setIntervalNanos(outlierDetection.intervalNanos());
1✔
837
    }
838
    if (outlierDetection.baseEjectionTimeNanos() != null) {
1✔
839
      configBuilder.setBaseEjectionTimeNanos(outlierDetection.baseEjectionTimeNanos());
1✔
840
    }
841
    if (outlierDetection.maxEjectionTimeNanos() != null) {
1✔
842
      configBuilder.setMaxEjectionTimeNanos(outlierDetection.maxEjectionTimeNanos());
1✔
843
    }
844
    if (outlierDetection.maxEjectionPercent() != null) {
1✔
845
      configBuilder.setMaxEjectionPercent(outlierDetection.maxEjectionPercent());
1✔
846
    }
847

848
    SuccessRateEjection successRate = outlierDetection.successRateEjection();
1✔
849
    if (successRate != null) {
1✔
850
      OutlierDetectionLoadBalancerConfig.SuccessRateEjection.Builder
851
          successRateConfigBuilder = new OutlierDetectionLoadBalancerConfig
1✔
852
          .SuccessRateEjection.Builder();
853

854
      if (successRate.stdevFactor() != null) {
1✔
855
        successRateConfigBuilder.setStdevFactor(successRate.stdevFactor());
1✔
856
      }
857
      if (successRate.enforcementPercentage() != null) {
1✔
858
        successRateConfigBuilder.setEnforcementPercentage(successRate.enforcementPercentage());
1✔
859
      }
860
      if (successRate.minimumHosts() != null) {
1✔
861
        successRateConfigBuilder.setMinimumHosts(successRate.minimumHosts());
1✔
862
      }
863
      if (successRate.requestVolume() != null) {
1✔
864
        successRateConfigBuilder.setRequestVolume(successRate.requestVolume());
1✔
865
      }
866

867
      configBuilder.setSuccessRateEjection(successRateConfigBuilder.build());
1✔
868
    }
869

870
    FailurePercentageEjection failurePercentage = outlierDetection.failurePercentageEjection();
1✔
871
    if (failurePercentage != null) {
1✔
872
      OutlierDetectionLoadBalancerConfig.FailurePercentageEjection.Builder
873
          failurePercentageConfigBuilder = new OutlierDetectionLoadBalancerConfig
1✔
874
          .FailurePercentageEjection.Builder();
875

876
      if (failurePercentage.threshold() != null) {
1✔
877
        failurePercentageConfigBuilder.setThreshold(failurePercentage.threshold());
1✔
878
      }
879
      if (failurePercentage.enforcementPercentage() != null) {
1✔
880
        failurePercentageConfigBuilder.setEnforcementPercentage(
1✔
881
            failurePercentage.enforcementPercentage());
1✔
882
      }
883
      if (failurePercentage.minimumHosts() != null) {
1✔
884
        failurePercentageConfigBuilder.setMinimumHosts(failurePercentage.minimumHosts());
1✔
885
      }
886
      if (failurePercentage.requestVolume() != null) {
1✔
887
        failurePercentageConfigBuilder.setRequestVolume(failurePercentage.requestVolume());
1✔
888
      }
889

890
      configBuilder.setFailurePercentageEjection(failurePercentageConfigBuilder.build());
1✔
891
    }
892

893
    return configBuilder.build();
1✔
894
  }
895

896
  /**
897
   * Generates a string that represents the priority in the LB policy config. The string is unique
898
   * across priorities in all clusters and priorityName(c, p1) < priorityName(c, p2) iff p1 < p2.
899
   * The ordering is undefined for priorities in different clusters.
900
   */
901
  private static String priorityName(String cluster, int priority) {
902
    return cluster + "[child" + priority + "]";
1✔
903
  }
904

905
  /**
906
   * Generates a string that represents the locality in the LB policy config. The string is unique
907
   * across all localities in all clusters.
908
   */
909
  private static String localityName(Locality locality) {
910
    return "{region=\"" + locality.region()
1✔
911
        + "\", zone=\"" + locality.zone()
1✔
912
        + "\", sub_zone=\"" + locality.subZone()
1✔
913
        + "\"}";
914
  }
915
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc