• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19211

08 May 2024 10:50PM UTC coverage: 88.309% (-0.02%) from 88.328%
#19211

push

github

ejona86
xds: Plumb locality in xds_cluster_impl and weighted_target

As part of gRFC A78:

> To support the locality label in the WRR metrics, we will extend the
> `weighted_target` LB policy (see A28) to define a resolver attribute
> that indicates the name of its child. This attribute will be passed
> down to each of its children with the appropriate value, so that any
> LB policy that sits underneath the `weighted_target` policy will be
> able to use it.

xds_cluster_impl is involved because it uses the child names in the
AddressFilter, which must match the names used by weighted_target.
Instead of using Locality.toString() in multiple policies and assuming
the policies agree, we now have xds_cluster_impl decide the locality's
name and pass it down explicitly. This allows us to change the name
format to match gRFC A78:

> If locality information is available, the value of this label will be
> of the form `{region="${REGION}", zone="${ZONE}",
> sub_zone="${SUB_ZONE}"}`, where `${REGION}`, `${ZONE}`, and
> `${SUB_ZONE}` are replaced with the actual values. If no locality
> information is available, the label will be set to the empty string.

31515 of 35687 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.63
/../xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import io.grpc.Attributes;
25
import io.grpc.EquivalentAddressGroup;
26
import io.grpc.InternalLogId;
27
import io.grpc.LoadBalancer;
28
import io.grpc.LoadBalancerProvider;
29
import io.grpc.LoadBalancerRegistry;
30
import io.grpc.NameResolver;
31
import io.grpc.NameResolver.ResolutionResult;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.SynchronizationContext.ScheduledHandle;
35
import io.grpc.internal.BackoffPolicy;
36
import io.grpc.internal.ExponentialBackoffPolicy;
37
import io.grpc.internal.ObjectPool;
38
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
39
import io.grpc.util.ForwardingLoadBalancerHelper;
40
import io.grpc.util.GracefulSwitchLoadBalancer;
41
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
42
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
43
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
44
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
45
import io.grpc.xds.Endpoints.DropOverload;
46
import io.grpc.xds.Endpoints.LbEndpoint;
47
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
48
import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection;
49
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
50
import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection;
51
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
52
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
53
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
54
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
55
import io.grpc.xds.client.Bootstrapper.ServerInfo;
56
import io.grpc.xds.client.Locality;
57
import io.grpc.xds.client.XdsClient;
58
import io.grpc.xds.client.XdsClient.ResourceWatcher;
59
import io.grpc.xds.client.XdsLogger;
60
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
61
import java.net.URI;
62
import java.net.URISyntaxException;
63
import java.util.ArrayList;
64
import java.util.Arrays;
65
import java.util.Collections;
66
import java.util.HashMap;
67
import java.util.HashSet;
68
import java.util.List;
69
import java.util.Locale;
70
import java.util.Map;
71
import java.util.Objects;
72
import java.util.Set;
73
import java.util.TreeMap;
74
import java.util.concurrent.ScheduledExecutorService;
75
import java.util.concurrent.TimeUnit;
76
import javax.annotation.Nullable;
77

78
/**
79
 * Load balancer for cluster_resolver_experimental LB policy. This LB policy is the child LB policy
80
 * of the cds_experimental LB policy and the parent LB policy of the priority_experimental LB
81
 * policy in the xDS load balancing hierarchy. This policy resolves endpoints of non-aggregate
82
 * clusters (e.g., EDS or Logical DNS) and groups endpoints in priorities and localities to be
83
 * used in the downstream LB policies for fine-grained load balancing purposes.
84
 */
85
final class ClusterResolverLoadBalancer extends LoadBalancer {
86
  // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
87
  // to an empty locality.
88
  private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
1✔
89
  private final XdsLogger logger;
90
  private final SynchronizationContext syncContext;
91
  private final ScheduledExecutorService timeService;
92
  private final LoadBalancerRegistry lbRegistry;
93
  private final BackoffPolicy.Provider backoffPolicyProvider;
94
  private final GracefulSwitchLoadBalancer delegate;
95
  private ObjectPool<XdsClient> xdsClientPool;
96
  private XdsClient xdsClient;
97
  private ClusterResolverConfig config;
98

99
  ClusterResolverLoadBalancer(Helper helper) {
100
    this(helper, LoadBalancerRegistry.getDefaultRegistry(),
1✔
101
        new ExponentialBackoffPolicy.Provider());
102
  }
1✔
103

104
  @VisibleForTesting
105
  ClusterResolverLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry,
106
      BackoffPolicy.Provider backoffPolicyProvider) {
1✔
107
    this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
1✔
108
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
109
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
110
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
111
    delegate = new GracefulSwitchLoadBalancer(helper);
1✔
112
    logger = XdsLogger.withLogId(
1✔
113
        InternalLogId.allocate("cluster-resolver-lb", helper.getAuthority()));
1✔
114
    logger.log(XdsLogLevel.INFO, "Created");
1✔
115
  }
1✔
116

117
  @Override
118
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
119
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
120
    if (xdsClientPool == null) {
1✔
121
      xdsClientPool = resolvedAddresses.getAttributes().get(InternalXdsAttributes.XDS_CLIENT_POOL);
1✔
122
      xdsClient = xdsClientPool.getObject();
1✔
123
    }
124
    ClusterResolverConfig config =
1✔
125
        (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
126
    if (!Objects.equals(this.config, config)) {
1✔
127
      logger.log(XdsLogLevel.DEBUG, "Config: {0}", config);
1✔
128
      delegate.switchTo(new ClusterResolverLbStateFactory());
1✔
129
      this.config = config;
1✔
130
      delegate.handleResolvedAddresses(resolvedAddresses);
1✔
131
    }
132
    return Status.OK;
1✔
133
  }
134

135
  @Override
136
  public void handleNameResolutionError(Status error) {
137
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
138
    delegate.handleNameResolutionError(error);
1✔
139
  }
1✔
140

141
  @Override
142
  public void shutdown() {
143
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
144
    delegate.shutdown();
1✔
145
    if (xdsClientPool != null) {
1✔
146
      xdsClientPool.returnObject(xdsClient);
1✔
147
    }
148
  }
1✔
149

150
  private final class ClusterResolverLbStateFactory extends LoadBalancer.Factory {
1✔
151
    @Override
152
    public LoadBalancer newLoadBalancer(Helper helper) {
153
      return new ClusterResolverLbState(helper);
1✔
154
    }
155
  }
156

157
  /**
158
   * The state of a cluster_resolver LB working session. A new instance is created whenever
159
   * the cluster_resolver LB receives a new config. The old instance is replaced when the
160
   * new one is ready to handle new RPCs.
161
   */
162
  private final class ClusterResolverLbState extends LoadBalancer {
163
    private final Helper helper;
164
    private final List<String> clusters = new ArrayList<>();
1✔
165
    private final Map<String, ClusterState> clusterStates = new HashMap<>();
1✔
166
    private PolicySelection endpointLbPolicy;
167
    private ResolvedAddresses resolvedAddresses;
168
    private LoadBalancer childLb;
169

170
    ClusterResolverLbState(Helper helper) {
1✔
171
      this.helper = new RefreshableHelper(checkNotNull(helper, "helper"));
1✔
172
      logger.log(XdsLogLevel.DEBUG, "New ClusterResolverLbState");
1✔
173
    }
1✔
174

175
    @Override
176
    public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
177
      this.resolvedAddresses = resolvedAddresses;
1✔
178
      ClusterResolverConfig config =
1✔
179
          (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
180
      endpointLbPolicy = config.lbPolicy;
1✔
181
      for (DiscoveryMechanism instance : config.discoveryMechanisms) {
1✔
182
        clusters.add(instance.cluster);
1✔
183
        ClusterState state;
184
        if (instance.type == DiscoveryMechanism.Type.EDS) {
1✔
185
          state = new EdsClusterState(instance.cluster, instance.edsServiceName,
1✔
186
              instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
187
              instance.outlierDetection);
188
        } else {  // logical DNS
189
          state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
1✔
190
              instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext);
191
        }
192
        clusterStates.put(instance.cluster, state);
1✔
193
        state.start();
1✔
194
      }
1✔
195
      return Status.OK;
1✔
196
    }
197

198
    @Override
199
    public void handleNameResolutionError(Status error) {
200
      if (childLb != null) {
1✔
201
        childLb.handleNameResolutionError(error);
1✔
202
      } else {
203
        helper.updateBalancingState(
1✔
204
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
205
      }
206
    }
1✔
207

208
    @Override
209
    public void shutdown() {
210
      for (ClusterState state : clusterStates.values()) {
1✔
211
        state.shutdown();
1✔
212
      }
1✔
213
      if (childLb != null) {
1✔
214
        childLb.shutdown();
1✔
215
      }
216
    }
1✔
217

218
    private void handleEndpointResourceUpdate() {
219
      List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
220
      Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
1✔
221
      List<String> priorities = new ArrayList<>();  // totally ordered priority list
1✔
222

223
      Status endpointNotFound = Status.OK;
1✔
224
      for (String cluster : clusters) {
1✔
225
        ClusterState state = clusterStates.get(cluster);
1✔
226
        // Propagate endpoints to the child LB policy only after all clusters have been resolved.
227
        if (!state.resolved && state.status.isOk()) {
1✔
228
          return;
1✔
229
        }
230
        if (state.result != null) {
1✔
231
          addresses.addAll(state.result.addresses);
1✔
232
          priorityChildConfigs.putAll(state.result.priorityChildConfigs);
1✔
233
          priorities.addAll(state.result.priorities);
1✔
234
        } else {
235
          endpointNotFound = state.status;
1✔
236
        }
237
      }
1✔
238
      if (addresses.isEmpty()) {
1✔
239
        if (endpointNotFound.isOk()) {
1✔
240
          endpointNotFound = Status.UNAVAILABLE.withDescription(
1✔
241
              "No usable endpoint from cluster(s): " + clusters);
242
        } else {
243
          endpointNotFound =
1✔
244
              Status.UNAVAILABLE.withCause(endpointNotFound.getCause())
1✔
245
                  .withDescription(endpointNotFound.getDescription());
1✔
246
        }
247
        helper.updateBalancingState(
1✔
248
            TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(endpointNotFound)));
1✔
249
        if (childLb != null) {
1✔
250
          childLb.shutdown();
1✔
251
          childLb = null;
1✔
252
        }
253
        return;
1✔
254
      }
255
      PriorityLbConfig childConfig =
1✔
256
          new PriorityLbConfig(Collections.unmodifiableMap(priorityChildConfigs),
1✔
257
              Collections.unmodifiableList(priorities));
1✔
258
      if (childLb == null) {
1✔
259
        childLb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(helper);
1✔
260
      }
261
      childLb.handleResolvedAddresses(
1✔
262
          resolvedAddresses.toBuilder()
1✔
263
              .setLoadBalancingPolicyConfig(childConfig)
1✔
264
              .setAddresses(Collections.unmodifiableList(addresses))
1✔
265
              .build());
1✔
266
    }
1✔
267

268
    private void handleEndpointResolutionError() {
269
      boolean allInError = true;
1✔
270
      Status error = null;
1✔
271
      for (String cluster : clusters) {
1✔
272
        ClusterState state = clusterStates.get(cluster);
1✔
273
        if (state.status.isOk()) {
1✔
274
          allInError = false;
1✔
275
        } else {
276
          error = state.status;
1✔
277
        }
278
      }
1✔
279
      if (allInError) {
1✔
280
        if (childLb != null) {
1✔
281
          childLb.handleNameResolutionError(error);
1✔
282
        } else {
283
          helper.updateBalancingState(
1✔
284
              TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
285
        }
286
      }
287
    }
1✔
288

289
    /**
290
     * Wires re-resolution requests from downstream LB policies with DNS resolver.
291
     */
292
    private final class RefreshableHelper extends ForwardingLoadBalancerHelper {
293
      private final Helper delegate;
294

295
      private RefreshableHelper(Helper delegate) {
1✔
296
        this.delegate = checkNotNull(delegate, "delegate");
1✔
297
      }
1✔
298

299
      @Override
300
      public void refreshNameResolution() {
301
        for (ClusterState state : clusterStates.values()) {
1✔
302
          if (state instanceof LogicalDnsClusterState) {
1✔
303
            ((LogicalDnsClusterState) state).refresh();
1✔
304
          }
305
        }
1✔
306
      }
1✔
307

308
      @Override
309
      protected Helper delegate() {
310
        return delegate;
1✔
311
      }
312
    }
313

314
    /**
315
     * Resolution state of an underlying cluster.
316
     */
317
    private abstract class ClusterState {
318
      // Name of the cluster to be resolved.
319
      protected final String name;
320
      @Nullable
321
      protected final ServerInfo lrsServerInfo;
322
      @Nullable
323
      protected final Long maxConcurrentRequests;
324
      @Nullable
325
      protected final UpstreamTlsContext tlsContext;
326
      @Nullable
327
      protected final OutlierDetection outlierDetection;
328
      // Resolution status, may contain most recent error encountered.
329
      protected Status status = Status.OK;
1✔
330
      // True if has received resolution result.
331
      protected boolean resolved;
332
      // Most recently resolved addresses and config, or null if resource not exists.
333
      @Nullable
334
      protected ClusterResolutionResult result;
335

336
      protected boolean shutdown;
337

338
      private ClusterState(String name, @Nullable ServerInfo lrsServerInfo,
339
          @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
340
          @Nullable OutlierDetection outlierDetection) {
1✔
341
        this.name = name;
1✔
342
        this.lrsServerInfo = lrsServerInfo;
1✔
343
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
344
        this.tlsContext = tlsContext;
1✔
345
        this.outlierDetection = outlierDetection;
1✔
346
      }
1✔
347

348
      abstract void start();
349

350
      void shutdown() {
351
        shutdown = true;
1✔
352
      }
1✔
353
    }
354

355
    private final class EdsClusterState extends ClusterState implements ResourceWatcher<EdsUpdate> {
356
      @Nullable
357
      private final String edsServiceName;
358
      private Map<Locality, String> localityPriorityNames = Collections.emptyMap();
1✔
359
      int priorityNameGenId = 1;
1✔
360

361
      private EdsClusterState(String name, @Nullable String edsServiceName,
362
          @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
363
          @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection) {
1✔
364
        super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, outlierDetection);
1✔
365
        this.edsServiceName = edsServiceName;
1✔
366
      }
1✔
367

368
      @Override
369
      void start() {
370
        String resourceName = edsServiceName != null ? edsServiceName : name;
1✔
371
        logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
1✔
372
        xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
1✔
373
            resourceName, this, syncContext);
1✔
374
      }
1✔
375

376
      @Override
377
      protected void shutdown() {
378
        super.shutdown();
1✔
379
        String resourceName = edsServiceName != null ? edsServiceName : name;
1✔
380
        logger.log(XdsLogLevel.INFO, "Stop watching EDS resource {0}", resourceName);
1✔
381
        xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), resourceName, this);
1✔
382
      }
1✔
383

384
      @Override
385
      public void onChanged(final EdsUpdate update) {
386
        class EndpointsUpdated implements Runnable {
1✔
387
          @Override
388
          public void run() {
389
            if (shutdown) {
1✔
390
              return;
×
391
            }
392
            logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update);
1✔
393
            if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
394
              logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories",
×
395
                  update.clusterName, update.localityLbEndpointsMap.size(),
×
396
                  update.dropPolicies.size());
×
397
            }
398
            Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
1✔
399
                update.localityLbEndpointsMap;
400
            List<DropOverload> dropOverloads = update.dropPolicies;
1✔
401
            List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
402
            Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
1✔
403
            List<String> sortedPriorityNames = generatePriorityNames(name, localityLbEndpoints);
1✔
404
            for (Locality locality : localityLbEndpoints.keySet()) {
1✔
405
              LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
1✔
406
              String priorityName = localityPriorityNames.get(locality);
1✔
407
              boolean discard = true;
1✔
408
              for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
1✔
409
                if (endpoint.isHealthy()) {
1✔
410
                  discard = false;
1✔
411
                  long weight = localityLbInfo.localityWeight();
1✔
412
                  if (endpoint.loadBalancingWeight() != 0) {
1✔
413
                    weight *= endpoint.loadBalancingWeight();
1✔
414
                  }
415
                  String localityName = localityName(locality);
1✔
416
                  Attributes attr =
1✔
417
                      endpoint.eag().getAttributes().toBuilder()
1✔
418
                          .set(InternalXdsAttributes.ATTR_LOCALITY, locality)
1✔
419
                          .set(InternalXdsAttributes.ATTR_LOCALITY_NAME, localityName)
1✔
420
                          .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT,
1✔
421
                              localityLbInfo.localityWeight())
1✔
422
                          .set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight)
1✔
423
                          .build();
1✔
424
                  EquivalentAddressGroup eag = new EquivalentAddressGroup(
1✔
425
                      endpoint.eag().getAddresses(), attr);
1✔
426
                  eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
1✔
427
                  addresses.add(eag);
1✔
428
                }
429
              }
1✔
430
              if (discard) {
1✔
431
                logger.log(XdsLogLevel.INFO,
1✔
432
                    "Discard locality {0} with 0 healthy endpoints", locality);
433
                continue;
1✔
434
              }
435
              if (!prioritizedLocalityWeights.containsKey(priorityName)) {
1✔
436
                prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
1✔
437
              }
438
              prioritizedLocalityWeights.get(priorityName).put(
1✔
439
                  locality, localityLbInfo.localityWeight());
1✔
440
            }
1✔
441
            if (prioritizedLocalityWeights.isEmpty()) {
1✔
442
              // Will still update the result, as if the cluster resource is revoked.
443
              logger.log(XdsLogLevel.INFO,
1✔
444
                  "Cluster {0} has no usable priority/locality/endpoint", update.clusterName);
445
            }
446
            sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet());
1✔
447
            Map<String, PriorityChildConfig> priorityChildConfigs =
1✔
448
                generateEdsBasedPriorityChildConfigs(
1✔
449
                    name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext,
1✔
450
                    outlierDetection, endpointLbPolicy, lbRegistry, prioritizedLocalityWeights,
1✔
451
                    dropOverloads);
452
            status = Status.OK;
1✔
453
            resolved = true;
1✔
454
            result = new ClusterResolutionResult(addresses, priorityChildConfigs,
1✔
455
                sortedPriorityNames);
456
            handleEndpointResourceUpdate();
1✔
457
          }
1✔
458
        }
459

460
        new EndpointsUpdated().run();
1✔
461
      }
1✔
462

463
      private List<String> generatePriorityNames(String name,
464
          Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
465
        TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
1✔
466
        for (Locality locality : localityLbEndpoints.keySet()) {
1✔
467
          int priority = localityLbEndpoints.get(locality).priority();
1✔
468
          if (!todo.containsKey(priority)) {
1✔
469
            todo.put(priority, new ArrayList<>());
1✔
470
          }
471
          todo.get(priority).add(locality);
1✔
472
        }
1✔
473
        Map<Locality, String> newNames = new HashMap<>();
1✔
474
        Set<String> usedNames = new HashSet<>();
1✔
475
        List<String> ret = new ArrayList<>();
1✔
476
        for (Integer priority: todo.keySet()) {
1✔
477
          String foundName = "";
1✔
478
          for (Locality locality : todo.get(priority)) {
1✔
479
            if (localityPriorityNames.containsKey(locality)
1✔
480
                && usedNames.add(localityPriorityNames.get(locality))) {
1✔
481
              foundName = localityPriorityNames.get(locality);
1✔
482
              break;
1✔
483
            }
484
          }
1✔
485
          if ("".equals(foundName)) {
1✔
486
            foundName = String.format(Locale.US, "%s[child%d]", name, priorityNameGenId++);
1✔
487
          }
488
          for (Locality locality : todo.get(priority)) {
1✔
489
            newNames.put(locality, foundName);
1✔
490
          }
1✔
491
          ret.add(foundName);
1✔
492
        }
1✔
493
        localityPriorityNames = newNames;
1✔
494
        return ret;
1✔
495
      }
496

497
      @Override
498
      public void onResourceDoesNotExist(final String resourceName) {
499
        if (shutdown) {
1✔
500
          return;
×
501
        }
502
        logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
1✔
503
        status = Status.OK;
1✔
504
        resolved = true;
1✔
505
        result = null;  // resource revoked
1✔
506
        handleEndpointResourceUpdate();
1✔
507
      }
1✔
508

509
      @Override
510
      public void onError(final Status error) {
511
        if (shutdown) {
1✔
512
          return;
×
513
        }
514
        String resourceName = edsServiceName != null ? edsServiceName : name;
1✔
515
        status = Status.UNAVAILABLE
1✔
516
            .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
1✔
517
                  resourceName, error.getCode(), error.getDescription()))
1✔
518
            .withCause(error.getCause());
1✔
519
        logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
1✔
520
        handleEndpointResolutionError();
1✔
521
      }
1✔
522
    }
523

524
    private final class LogicalDnsClusterState extends ClusterState {
525
      private final String dnsHostName;
526
      private final NameResolver.Factory nameResolverFactory;
527
      private final NameResolver.Args nameResolverArgs;
528
      private NameResolver resolver;
529
      @Nullable
530
      private BackoffPolicy backoffPolicy;
531
      @Nullable
532
      private ScheduledHandle scheduledRefresh;
533

534
      private LogicalDnsClusterState(String name, String dnsHostName,
535
          @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
536
          @Nullable UpstreamTlsContext tlsContext) {
1✔
537
        super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, null);
1✔
538
        this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName");
1✔
539
        nameResolverFactory =
1✔
540
            checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory");
1✔
541
        nameResolverArgs = checkNotNull(helper.getNameResolverArgs(), "nameResolverArgs");
1✔
542
      }
1✔
543

544
      @Override
545
      void start() {
546
        URI uri;
547
        try {
548
          uri = new URI("dns", "", "/" + dnsHostName, null);
1✔
549
        } catch (URISyntaxException e) {
×
550
          status = Status.INTERNAL.withDescription(
×
551
              "Bug, invalid URI creation: " + dnsHostName).withCause(e);
×
552
          handleEndpointResolutionError();
×
553
          return;
×
554
        }
1✔
555
        resolver = nameResolverFactory.newNameResolver(uri, nameResolverArgs);
1✔
556
        if (resolver == null) {
1✔
557
          status = Status.INTERNAL.withDescription("Xds cluster resolver lb for logical DNS "
×
558
              + "cluster [" + name + "] cannot find DNS resolver with uri:" + uri);
559
          handleEndpointResolutionError();
×
560
          return;
×
561
        }
562
        resolver.start(new NameResolverListener());
1✔
563
      }
1✔
564

565
      void refresh() {
566
        if (resolver == null) {
1✔
567
          return;
×
568
        }
569
        cancelBackoff();
1✔
570
        resolver.refresh();
1✔
571
      }
1✔
572

573
      @Override
574
      void shutdown() {
575
        super.shutdown();
1✔
576
        if (resolver != null) {
1✔
577
          resolver.shutdown();
1✔
578
        }
579
        cancelBackoff();
1✔
580
      }
1✔
581

582
      private void cancelBackoff() {
583
        if (scheduledRefresh != null) {
1✔
584
          scheduledRefresh.cancel();
1✔
585
          scheduledRefresh = null;
1✔
586
          backoffPolicy = null;
1✔
587
        }
588
      }
1✔
589

590
      private class DelayedNameResolverRefresh implements Runnable {
1✔
591
        @Override
592
        public void run() {
593
          scheduledRefresh = null;
1✔
594
          if (!shutdown) {
1✔
595
            resolver.refresh();
1✔
596
          }
597
        }
1✔
598
      }
599

600
      private class NameResolverListener extends NameResolver.Listener2 {
1✔
601
        @Override
602
        public void onResult(final ResolutionResult resolutionResult) {
603
          class NameResolved implements Runnable {
1✔
604
            @Override
605
            public void run() {
606
              if (shutdown) {
1✔
607
                return;
×
608
              }
609
              backoffPolicy = null;  // reset backoff sequence if succeeded
1✔
610
              // Arbitrary priority notation for all DNS-resolved endpoints.
611
              String priorityName = priorityName(name, 0);  // value doesn't matter
1✔
612
              List<EquivalentAddressGroup> addresses = new ArrayList<>();
1✔
613
              for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
1✔
614
                // No weight attribute is attached, all endpoint-level LB policy should be able
615
                // to handle such it.
616
                String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
1✔
617
                Attributes attr = eag.getAttributes().toBuilder()
1✔
618
                    .set(InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
1✔
619
                    .set(InternalXdsAttributes.ATTR_LOCALITY_NAME, localityName)
1✔
620
                    .build();
1✔
621
                eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
1✔
622
                eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
1✔
623
                addresses.add(eag);
1✔
624
              }
1✔
625
              PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
1✔
626
                  name, lrsServerInfo, maxConcurrentRequests, tlsContext, lbRegistry,
1✔
627
                  Collections.<DropOverload>emptyList());
1✔
628
              status = Status.OK;
1✔
629
              resolved = true;
1✔
630
              result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
1✔
631
              handleEndpointResourceUpdate();
1✔
632
            }
1✔
633
          }
634

635
          syncContext.execute(new NameResolved());
1✔
636
        }
1✔
637

638
        @Override
639
        public void onError(final Status error) {
640
          syncContext.execute(new Runnable() {
1✔
641
            @Override
642
            public void run() {
643
              if (shutdown) {
1✔
644
                return;
×
645
              }
646
              status = error;
1✔
647
              // NameResolver.Listener API cannot distinguish between address-not-found and
648
              // transient errors. If the error occurs in the first resolution, treat it as
649
              // address not found. Otherwise, either there is previously resolved addresses
650
              // previously encountered error, propagate the error to downstream/upstream and
651
              // let downstream/upstream handle it.
652
              if (!resolved) {
1✔
653
                resolved = true;
1✔
654
                handleEndpointResourceUpdate();
1✔
655
              } else {
656
                handleEndpointResolutionError();
1✔
657
              }
658
              if (scheduledRefresh != null && scheduledRefresh.isPending()) {
1✔
659
                return;
×
660
              }
661
              if (backoffPolicy == null) {
1✔
662
                backoffPolicy = backoffPolicyProvider.get();
1✔
663
              }
664
              long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
665
              logger.log(XdsLogLevel.DEBUG,
1✔
666
                  "Logical DNS resolver for cluster {0} encountered name resolution "
667
                      + "error: {1}, scheduling DNS resolution backoff for {2} ns",
668
                  name, error, delayNanos);
1✔
669
              scheduledRefresh =
1✔
670
                  syncContext.schedule(
1✔
671
                      new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
672
                      timeService);
1✔
673
            }
1✔
674
          });
675
        }
1✔
676
      }
677
    }
678
  }
679

680
  private static class ClusterResolutionResult {
681
    // Endpoint addresses.
682
    private final List<EquivalentAddressGroup> addresses;
683
    // Config (include load balancing policy/config) for each priority in the cluster.
684
    private final Map<String, PriorityChildConfig> priorityChildConfigs;
685
    // List of priority names ordered in descending priorities.
686
    private final List<String> priorities;
687

688
    ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority,
689
        PriorityChildConfig config) {
690
      this(addresses, Collections.singletonMap(priority, config),
1✔
691
          Collections.singletonList(priority));
1✔
692
    }
1✔
693

694
    ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
695
        Map<String, PriorityChildConfig> configs, List<String> priorities) {
1✔
696
      this.addresses = addresses;
1✔
697
      this.priorityChildConfigs = configs;
1✔
698
      this.priorities = priorities;
1✔
699
    }
1✔
700
  }
701

702
  /**
703
   * Generates the config to be used in the priority LB policy for the single priority of
704
   * logical DNS cluster.
705
   *
706
   * <p>priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first
707
   */
708
  private static PriorityChildConfig generateDnsBasedPriorityChildConfig(
709
      String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
710
      @Nullable UpstreamTlsContext tlsContext, LoadBalancerRegistry lbRegistry,
711
      List<DropOverload> dropOverloads) {
712
    // Override endpoint-level LB policy with pick_first for logical DNS cluster.
713
    PolicySelection endpointLbPolicy =
1✔
714
        new PolicySelection(lbRegistry.getProvider("pick_first"), null);
1✔
715
    ClusterImplConfig clusterImplConfig =
1✔
716
        new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests,
717
            dropOverloads, endpointLbPolicy, tlsContext);
718
    LoadBalancerProvider clusterImplLbProvider =
1✔
719
        lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
1✔
720
    PolicySelection clusterImplPolicy =
1✔
721
        new PolicySelection(clusterImplLbProvider, clusterImplConfig);
722
    return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/);
1✔
723
  }
724

725
  /**
726
   * Generates configs to be used in the priority LB policy for priorities in an EDS cluster.
727
   *
728
   * <p>priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB
729
   * -> round_robin / least_request_experimental (one per locality)) / ring_hash_experimental
730
   */
731
  private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildConfigs(
732
      String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo,
733
      @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
734
      @Nullable OutlierDetection outlierDetection, PolicySelection endpointLbPolicy,
735
      LoadBalancerRegistry lbRegistry, Map<String,
736
      Map<Locality, Integer>> prioritizedLocalityWeights, List<DropOverload> dropOverloads) {
737
    Map<String, PriorityChildConfig> configs = new HashMap<>();
1✔
738
    for (String priority : prioritizedLocalityWeights.keySet()) {
1✔
739
      ClusterImplConfig clusterImplConfig =
1✔
740
          new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests,
741
              dropOverloads, endpointLbPolicy, tlsContext);
742
      LoadBalancerProvider clusterImplLbProvider =
1✔
743
          lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
1✔
744
      PolicySelection priorityChildPolicy =
1✔
745
          new PolicySelection(clusterImplLbProvider, clusterImplConfig);
746

747
      // If outlier detection has been configured we wrap the child policy in the outlier detection
748
      // load balancer.
749
      if (outlierDetection != null) {
1✔
750
        LoadBalancerProvider outlierDetectionProvider = lbRegistry.getProvider(
1✔
751
            "outlier_detection_experimental");
752
        priorityChildPolicy = new PolicySelection(outlierDetectionProvider,
1✔
753
            buildOutlierDetectionLbConfig(outlierDetection, priorityChildPolicy));
1✔
754
      }
755

756
      PriorityChildConfig priorityChildConfig =
1✔
757
          new PriorityChildConfig(priorityChildPolicy, true /* ignoreReresolution */);
758
      configs.put(priority, priorityChildConfig);
1✔
759
    }
1✔
760
    return configs;
1✔
761
  }
762

763
  /**
764
   * Converts {@link OutlierDetection} that represents the xDS configuration to {@link
765
   * OutlierDetectionLoadBalancerConfig} that the {@link io.grpc.util.OutlierDetectionLoadBalancer}
766
   * understands.
767
   */
768
  private static OutlierDetectionLoadBalancerConfig buildOutlierDetectionLbConfig(
769
      OutlierDetection outlierDetection, PolicySelection childPolicy) {
770
    OutlierDetectionLoadBalancerConfig.Builder configBuilder
1✔
771
        = new OutlierDetectionLoadBalancerConfig.Builder();
772

773
    configBuilder.setChildPolicy(childPolicy);
1✔
774

775
    if (outlierDetection.intervalNanos() != null) {
1✔
776
      configBuilder.setIntervalNanos(outlierDetection.intervalNanos());
1✔
777
    }
778
    if (outlierDetection.baseEjectionTimeNanos() != null) {
1✔
779
      configBuilder.setBaseEjectionTimeNanos(outlierDetection.baseEjectionTimeNanos());
1✔
780
    }
781
    if (outlierDetection.maxEjectionTimeNanos() != null) {
1✔
782
      configBuilder.setMaxEjectionTimeNanos(outlierDetection.maxEjectionTimeNanos());
1✔
783
    }
784
    if (outlierDetection.maxEjectionPercent() != null) {
1✔
785
      configBuilder.setMaxEjectionPercent(outlierDetection.maxEjectionPercent());
1✔
786
    }
787

788
    SuccessRateEjection successRate = outlierDetection.successRateEjection();
1✔
789
    if (successRate != null) {
1✔
790
      OutlierDetectionLoadBalancerConfig.SuccessRateEjection.Builder
791
          successRateConfigBuilder = new OutlierDetectionLoadBalancerConfig
1✔
792
          .SuccessRateEjection.Builder();
793

794
      if (successRate.stdevFactor() != null) {
1✔
795
        successRateConfigBuilder.setStdevFactor(successRate.stdevFactor());
1✔
796
      }
797
      if (successRate.enforcementPercentage() != null) {
1✔
798
        successRateConfigBuilder.setEnforcementPercentage(successRate.enforcementPercentage());
1✔
799
      }
800
      if (successRate.minimumHosts() != null) {
1✔
801
        successRateConfigBuilder.setMinimumHosts(successRate.minimumHosts());
1✔
802
      }
803
      if (successRate.requestVolume() != null) {
1✔
804
        successRateConfigBuilder.setRequestVolume(successRate.requestVolume());
1✔
805
      }
806

807
      configBuilder.setSuccessRateEjection(successRateConfigBuilder.build());
1✔
808
    }
809

810
    FailurePercentageEjection failurePercentage = outlierDetection.failurePercentageEjection();
1✔
811
    if (failurePercentage != null) {
1✔
812
      OutlierDetectionLoadBalancerConfig.FailurePercentageEjection.Builder
813
          failurePercentageConfigBuilder = new OutlierDetectionLoadBalancerConfig
1✔
814
          .FailurePercentageEjection.Builder();
815

816
      if (failurePercentage.threshold() != null) {
1✔
817
        failurePercentageConfigBuilder.setThreshold(failurePercentage.threshold());
1✔
818
      }
819
      if (failurePercentage.enforcementPercentage() != null) {
1✔
820
        failurePercentageConfigBuilder.setEnforcementPercentage(
1✔
821
            failurePercentage.enforcementPercentage());
1✔
822
      }
823
      if (failurePercentage.minimumHosts() != null) {
1✔
824
        failurePercentageConfigBuilder.setMinimumHosts(failurePercentage.minimumHosts());
1✔
825
      }
826
      if (failurePercentage.requestVolume() != null) {
1✔
827
        failurePercentageConfigBuilder.setRequestVolume(failurePercentage.requestVolume());
1✔
828
      }
829

830
      configBuilder.setFailurePercentageEjection(failurePercentageConfigBuilder.build());
1✔
831
    }
832

833
    return configBuilder.build();
1✔
834
  }
835

836
  /**
837
   * Generates a string that represents the priority in the LB policy config. The string is unique
838
   * across priorities in all clusters and priorityName(c, p1) < priorityName(c, p2) iff p1 < p2.
839
   * The ordering is undefined for priorities in different clusters.
840
   */
841
  private static String priorityName(String cluster, int priority) {
842
    return cluster + "[child" + priority + "]";
1✔
843
  }
844

845
  /**
846
   * Generates a string that represents the locality in the LB policy config. The string is unique
847
   * across all localities in all clusters.
848
   */
849
  private static String localityName(Locality locality) {
850
    return "{region=\"" + locality.region()
1✔
851
        + "\", zone=\"" + locality.zone()
1✔
852
        + "\", sub_zone=\"" + locality.subZone()
1✔
853
        + "\"}";
854
  }
855
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc