• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19870

17 Jun 2025 10:14PM UTC coverage: 88.543% (-0.01%) from 88.556%
#19870

push

github

ejona86
xds: Add logical dns cluster support to XdsDepManager

ClusterResolverLb gets the NameResolverRegistry from
LoadBalancer.Helper, so a new API was added in NameResover.Args to
propagate the same object to the name resolver tree.

RetryingNameResolver was exposed to xds. This is expected to be
temporary, as the retrying is being removed from ManagedChannelImpl and
moved into the resolvers. At that point, DnsNameResolverProvider would
wrap DnsNameResolver with a similar API to RetryingNameResolver and xds
would no longer be responsible.

34637 of 39119 relevant lines covered (88.54%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.16
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.ImmutableMap;
27
import io.grpc.EquivalentAddressGroup;
28
import io.grpc.NameResolver;
29
import io.grpc.NameResolverProvider;
30
import io.grpc.Status;
31
import io.grpc.StatusOr;
32
import io.grpc.SynchronizationContext;
33
import io.grpc.internal.RetryingNameResolver;
34
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
35
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
36
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
37
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
38
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
39
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
40
import io.grpc.xds.client.Locality;
41
import io.grpc.xds.client.XdsClient;
42
import io.grpc.xds.client.XdsClient.ResourceWatcher;
43
import io.grpc.xds.client.XdsResourceType;
44
import java.net.URI;
45
import java.net.URISyntaxException;
46
import java.util.ArrayList;
47
import java.util.Collections;
48
import java.util.EnumMap;
49
import java.util.HashMap;
50
import java.util.HashSet;
51
import java.util.LinkedHashSet;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.Objects;
55
import java.util.Set;
56
import javax.annotation.Nullable;
57

58
/**
59
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
60
 * maintains the watchers for the xds resources and when an update is received, it either requests
61
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
62
 * applies to a single data plane authority.
63
 */
64
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
65
  private enum TrackedWatcherTypeEnum {
1✔
66
    LDS, RDS, CDS, EDS, DNS
1✔
67
  }
68

69
  private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
1✔
70
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.LDS);
71
  private static final TrackedWatcherType<RdsUpdate> RDS_TYPE =
1✔
72
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.RDS);
73
  private static final TrackedWatcherType<XdsClusterResource.CdsUpdate> CDS_TYPE =
1✔
74
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
75
  private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
1✔
76
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
77
  private static final TrackedWatcherType<List<EquivalentAddressGroup>> DNS_TYPE =
1✔
78
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.DNS);
79

80
  // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
81
  // to an empty locality.
82
  private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
1✔
83

84
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
85
  private final String listenerName;
86
  private final XdsClient xdsClient;
87
  private final SynchronizationContext syncContext;
88
  private final String dataPlaneAuthority;
89
  private final NameResolver.Args nameResolverArgs;
90
  private XdsConfigWatcher xdsConfigWatcher;
91

92
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
93
  private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers =
1✔
94
      new EnumMap<>(TrackedWatcherTypeEnum.class);
95
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
96

97
  XdsDependencyManager(
98
      XdsClient xdsClient,
99
      SynchronizationContext syncContext,
100
      String dataPlaneAuthority,
101
      String listenerName,
102
      NameResolver.Args nameResolverArgs) {
1✔
103
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
104
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
105
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
106
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
107
    this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
108
  }
1✔
109

110
  public static String toContextStr(String typeName, String resourceName) {
111
    return typeName + " resource " + resourceName;
1✔
112
  }
113

114
  public void start(XdsConfigWatcher xdsConfigWatcher) {
115
    checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
1✔
116
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
117
    // start the ball rolling
118
    syncContext.execute(() -> addWatcher(LDS_TYPE, new LdsWatcher(listenerName)));
1✔
119
  }
1✔
120

121
  @Override
122
  public XdsConfig.Subscription subscribeToCluster(String clusterName) {
123
    checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
1✔
124
    checkNotNull(clusterName, "clusterName");
1✔
125
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
126

127
    syncContext.execute(() -> {
1✔
128
      if (getWatchers(LDS_TYPE).isEmpty()) {
1✔
129
        subscription.closed = true;
1✔
130
        return; // shutdown() called
1✔
131
      }
132
      subscriptions.add(subscription);
1✔
133
      addClusterWatcher(clusterName);
1✔
134
    });
1✔
135

136
    return subscription;
1✔
137
  }
138

139
  /**
140
   * For all logical dns clusters refresh their results.
141
   */
142
  public void requestReresolution() {
143
    syncContext.execute(() -> {
×
144
      for (TrackedWatcher<List<EquivalentAddressGroup>> watcher : getWatchers(DNS_TYPE).values()) {
×
145
        DnsWatcher dnsWatcher = (DnsWatcher) watcher;
×
146
        dnsWatcher.refresh();
×
147
      }
×
148
    });
×
149
  }
×
150

151
  private <T extends ResourceUpdate> void addWatcher(
152
      TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
153
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
154
    XdsResourceType<T> type = watcher.type;
1✔
155
    String resourceName = watcher.resourceName;
1✔
156

157
    getWatchers(watcherType).put(resourceName, watcher);
1✔
158
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
159
  }
1✔
160

161
  public void shutdown() {
162
    syncContext.execute(() -> {
1✔
163
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
164
        for (TrackedWatcher<?> watcher : watchers.watchers.values()) {
1✔
165
          watcher.close();
1✔
166
        }
1✔
167
      }
1✔
168
      resourceWatchers.clear();
1✔
169
      subscriptions.clear();
1✔
170
    });
1✔
171
  }
1✔
172

173
  private void releaseSubscription(ClusterSubscription subscription) {
174
    checkNotNull(subscription, "subscription");
1✔
175
    syncContext.execute(() -> {
1✔
176
      if (subscription.closed) {
1✔
177
        return;
1✔
178
      }
179
      subscription.closed = true;
1✔
180
      if (!subscriptions.remove(subscription)) {
1✔
181
        return; // shutdown() called
×
182
      }
183
      maybePublishConfig();
1✔
184
    });
1✔
185
  }
1✔
186

187
  /**
188
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
189
   * the watchers.
190
   */
191
  private void maybePublishConfig() {
192
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
193
    if (getWatchers(LDS_TYPE).isEmpty()) {
1✔
194
      return; // shutdown() called
×
195
    }
196
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
197
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
198
        .anyMatch(TrackedWatcher::missingResult);
1✔
199
    if (waitingOnResource) {
1✔
200
      return;
1✔
201
    }
202

203
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
204
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
205
      return;
1✔
206
    }
207
    assert newUpdate.hasValue()
1✔
208
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
209
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
210
    lastUpdate = newUpdate;
1✔
211
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
212
  }
1✔
213

214
  @VisibleForTesting
215
  StatusOr<XdsConfig> buildUpdate() {
216
    // Create a config and discard any watchers not accessed
217
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
218
    StatusOr<XdsConfig> config = buildUpdate(
1✔
219
        tracer, listenerName, dataPlaneAuthority, subscriptions);
220
    tracer.closeUnusedWatchers();
1✔
221
    return config;
1✔
222
  }
223

224
  private static StatusOr<XdsConfig> buildUpdate(
225
      WatcherTracer tracer,
226
      String listenerName,
227
      String dataPlaneAuthority,
228
      Set<ClusterSubscription> subscriptions) {
229
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
230

231
    // Iterate watchers and build the XdsConfig
232

233
    TrackedWatcher<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
234
        = tracer.getWatcher(LDS_TYPE, listenerName);
1✔
235
    if (ldsWatcher == null) {
1✔
236
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
237
          "Bug: No listener watcher found for " + listenerName));
238
    }
239
    if (!ldsWatcher.getData().hasValue()) {
1✔
240
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
241
    }
242
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
243
    builder.setListener(ldsUpdate);
1✔
244

245
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
246
    if (routeSource == null) {
1✔
247
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
248
          "Bug: No route source found for listener " + dataPlaneAuthority));
249
    }
250
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
251
    if (!statusOrRdsUpdate.hasValue()) {
1✔
252
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
253
    }
254
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
255
    builder.setRoute(rdsUpdate);
1✔
256

257
    VirtualHost activeVirtualHost =
1✔
258
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
259
    if (activeVirtualHost == null) {
1✔
260
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
261
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
262
    }
263
    builder.setVirtualHost(activeVirtualHost);
1✔
264

265
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
266
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
267
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
268
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
269
    }
1✔
270
    for (ClusterSubscription subscription : subscriptions) {
1✔
271
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
272
    }
1✔
273
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
274
      builder.addCluster(me.getKey(), me.getValue());
1✔
275
    }
1✔
276

277
    return StatusOr.fromValue(builder.build());
1✔
278
  }
279

280
  private <T> Map<String, TrackedWatcher<T>> getWatchers(TrackedWatcherType<T> watcherType) {
281
    TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
1✔
282
    if (typeWatchers == null) {
1✔
283
      typeWatchers = new TypeWatchers<T>(watcherType);
1✔
284
      resourceWatchers.put(watcherType.typeEnum, typeWatchers);
1✔
285
    }
286
    assert typeWatchers.watcherType == watcherType;
1✔
287
    @SuppressWarnings("unchecked")
288
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
289
    return tTypeWatchers.watchers;
1✔
290
  }
291

292
  private static void addConfigForCluster(
293
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
294
      String clusterName,
295
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
296
      LinkedHashSet<String> ancestors,
297
      WatcherTracer tracer) {
298
    if (clusters.containsKey(clusterName)) {
1✔
299
      return;
1✔
300
    }
301
    if (ancestors.contains(clusterName)) {
1✔
302
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
303
          Status.INTERNAL.withDescription(
1✔
304
              "Aggregate cluster cycle detected: " + ancestors)));
305
      return;
1✔
306
    }
307
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
308
      clusters.put(clusterName, StatusOr.fromStatus(
×
309
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
310
      return;
×
311
    }
312

313
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CDS_TYPE, clusterName);
1✔
314
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
315
    if (!cdsWatcherDataOr.hasValue()) {
1✔
316
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
317
      return;
1✔
318
    }
319

320
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
321
    XdsConfig.XdsClusterConfig.ClusterChild child;
322
    switch (cdsUpdate.clusterType()) {
1✔
323
      case AGGREGATE:
324
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
325
        // preserves the priority across all aggregate clusters
326
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
327
        ancestors.add(clusterName);
1✔
328
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
329
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
330
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
331
          if (!config.hasValue()) {
1✔
332
            // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
333
            // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
334
            // watchers reports a transient ADS stream error, the policy should report that it is in
335
            // TRANSIENT_FAILURE if it has never passed a config to its child.
336
            //
337
            // But there's currently disagreement about whether that is actually what we want, and
338
            // that was not originally implemented in gRPC Java. So we're keeping Java's old
339
            // behavior for now and only failing the "leaves" (which is a bit arbitrary for a
340
            // cycle).
341
            leafNames.add(childCluster);
1✔
342
            continue;
1✔
343
          }
344
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
345
          if (children instanceof AggregateConfig) {
1✔
346
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
347
          } else {
348
            leafNames.add(childCluster);
1✔
349
          }
350
        }
1✔
351
        ancestors.remove(clusterName);
1✔
352

353
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
354
        break;
1✔
355
      case EDS:
356
        TrackedWatcher<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
357
            tracer.getWatcher(EDS_TYPE, cdsWatcher.getEdsServiceName());
1✔
358
        if (edsWatcher != null) {
1✔
359
          child = new EndpointConfig(edsWatcher.getData());
1✔
360
        } else {
361
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
362
              "EDS resource not found for cluster " + clusterName)));
363
        }
364
        break;
×
365
      case LOGICAL_DNS:
366
        TrackedWatcher<List<EquivalentAddressGroup>> dnsWatcher =
1✔
367
            tracer.getWatcher(DNS_TYPE, cdsUpdate.dnsHostName());
1✔
368
        child = new EndpointConfig(dnsToEdsUpdate(dnsWatcher.getData(), cdsUpdate.dnsHostName()));
1✔
369
        break;
1✔
370
      default:
371
        child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
372
              "Unknown type in cluster " + clusterName + " " + cdsUpdate.clusterType())));
×
373
    }
374
    if (clusters.containsKey(clusterName)) {
1✔
375
      // If a cycle is detected, we'll have detected it while recursing, so now there will be a key
376
      // present. We don't want to overwrite it with a non-error value.
377
      return;
1✔
378
    }
379
    clusters.put(clusterName, StatusOr.fromValue(
1✔
380
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
381
  }
1✔
382

383
  private static StatusOr<XdsEndpointResource.EdsUpdate> dnsToEdsUpdate(
384
      StatusOr<List<EquivalentAddressGroup>> dnsData, String dnsHostName) {
385
    if (!dnsData.hasValue()) {
1✔
386
      return StatusOr.fromStatus(dnsData.getStatus());
1✔
387
    }
388

389
    List<Endpoints.LbEndpoint> endpoints = new ArrayList<>();
1✔
390
    for (EquivalentAddressGroup eag : dnsData.getValue()) {
1✔
391
      endpoints.add(Endpoints.LbEndpoint.create(eag, 1, true, dnsHostName, ImmutableMap.of()));
1✔
392
    }
1✔
393
    LocalityLbEndpoints lbEndpoints =
1✔
394
        LocalityLbEndpoints.create(endpoints, 1, 0, ImmutableMap.of());
1✔
395
    return StatusOr.fromValue(new XdsEndpointResource.EdsUpdate(
1✔
396
        "fakeEds_logicalDns",
397
        Collections.singletonMap(LOGICAL_DNS_CLUSTER_LOCALITY, lbEndpoints),
1✔
398
        new ArrayList<>()));
399
  }
400

401
  private void addRdsWatcher(String resourceName) {
402
    if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
1✔
403
      return;
×
404
    }
405

406
    addWatcher(RDS_TYPE, new RdsWatcher(resourceName));
1✔
407
  }
1✔
408

409
  private void addEdsWatcher(String edsServiceName) {
410
    if (getWatchers(EDS_TYPE).containsKey(edsServiceName)) {
1✔
411
      return;
1✔
412
    }
413

414
    addWatcher(EDS_TYPE, new EdsWatcher(edsServiceName));
1✔
415
  }
1✔
416

417
  private void addClusterWatcher(String clusterName) {
418
    if (getWatchers(CDS_TYPE).containsKey(clusterName)) {
1✔
419
      return;
1✔
420
    }
421

422
    addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
1✔
423
  }
1✔
424

425
  private void addDnsWatcher(String dnsHostName) {
426
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
427
    if (getWatchers(DNS_TYPE).containsKey(dnsHostName)) {
1✔
428
      return;
×
429
    }
430

431
    DnsWatcher watcher = new DnsWatcher(dnsHostName, nameResolverArgs);
1✔
432
    getWatchers(DNS_TYPE).put(dnsHostName, watcher);
1✔
433
    watcher.start();
1✔
434
  }
1✔
435

436
  private void updateRoutes(List<VirtualHost> virtualHosts) {
437
    VirtualHost virtualHost =
1✔
438
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
439
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
440
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
441
  }
1✔
442

443
  private String nodeInfo() {
444
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
445
  }
446

447
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
448
    if (virtualHost == null) {
1✔
449
      return Collections.emptySet();
1✔
450
    }
451

452
    // Get all cluster names to which requests can be routed through the virtual host.
453
    Set<String> clusters = new HashSet<>();
1✔
454
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
455
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
456
      if (action == null) {
1✔
457
        continue;
1✔
458
      }
459
      if (action.cluster() != null) {
1✔
460
        clusters.add(action.cluster());
1✔
461
      } else if (action.weightedClusters() != null) {
1✔
462
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
463
          clusters.add(weighedCluster.name());
1✔
464
        }
1✔
465
      }
466
    }
1✔
467

468
    return clusters;
1✔
469
  }
470

471
  private static NameResolver createNameResolver(
472
      String dnsHostName,
473
      NameResolver.Args nameResolverArgs) {
474
    URI uri;
475
    try {
476
      uri = new URI("dns", "", "/" + dnsHostName, null);
1✔
477
    } catch (URISyntaxException e) {
×
478
      return new FailingNameResolver(
×
479
          Status.INTERNAL.withDescription("Bug, invalid URI creation: " + dnsHostName)
×
480
            .withCause(e));
×
481
    }
1✔
482

483
    NameResolverProvider provider =
1✔
484
        nameResolverArgs.getNameResolverRegistry().getProviderForScheme("dns");
1✔
485
    if (provider == null) {
1✔
486
      return new FailingNameResolver(
1✔
487
          Status.INTERNAL.withDescription("Could not find dns name resolver"));
1✔
488
    }
489

490
    NameResolver bareResolver = provider.newNameResolver(uri, nameResolverArgs);
1✔
491
    if (bareResolver == null) {
1✔
492
      return new FailingNameResolver(
×
493
          Status.INTERNAL.withDescription("DNS name resolver provider returned null: " + uri));
×
494
    }
495
    return RetryingNameResolver.wrap(bareResolver, nameResolverArgs);
1✔
496
  }
497

498
  private static class TypeWatchers<T> {
499
    // Key is resource name
500
    final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
1✔
501
    final TrackedWatcherType<T> watcherType;
502

503
    TypeWatchers(TrackedWatcherType<T> watcherType) {
1✔
504
      this.watcherType = checkNotNull(watcherType, "watcherType");
1✔
505
    }
1✔
506
  }
507

508
  public interface XdsConfigWatcher {
509
    /**
510
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
511
     * INTERNAL.
512
     */
513
    void onUpdate(StatusOr<XdsConfig> config);
514
  }
515

516
  private final class ClusterSubscription implements XdsConfig.Subscription {
517
    private final String clusterName;
518
    boolean closed; // Accessed from syncContext
519

520
    public ClusterSubscription(String clusterName) {
1✔
521
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
522
    }
1✔
523

524
    String getClusterName() {
525
      return clusterName;
1✔
526
    }
527

528
    @Override
529
    public void close() {
530
      releaseSubscription(this);
1✔
531
    }
1✔
532
  }
533

534
  /** State for tracing garbage collector. */
535
  private static final class WatcherTracer {
1✔
536
    private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers;
537
    private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> usedWatchers;
538

539
    public WatcherTracer(Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers) {
1✔
540
      this.resourceWatchers = resourceWatchers;
1✔
541

542
      this.usedWatchers = new EnumMap<>(TrackedWatcherTypeEnum.class);
1✔
543
      for (Map.Entry<TrackedWatcherTypeEnum, TypeWatchers<?>> me : resourceWatchers.entrySet()) {
1✔
544
        usedWatchers.put(me.getKey(), newTypeWatchers(me.getValue().watcherType));
1✔
545
      }
1✔
546
    }
1✔
547

548
    private static <T> TypeWatchers<T> newTypeWatchers(TrackedWatcherType<T> type) {
549
      return new TypeWatchers<T>(type);
1✔
550
    }
551

552
    public <T> TrackedWatcher<T> getWatcher(TrackedWatcherType<T> watcherType, String name) {
553
      TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
1✔
554
      if (typeWatchers == null) {
1✔
555
        return null;
×
556
      }
557
      assert typeWatchers.watcherType == watcherType;
1✔
558
      @SuppressWarnings("unchecked")
559
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
560
      TrackedWatcher<T> watcher = tTypeWatchers.watchers.get(name);
1✔
561
      if (watcher == null) {
1✔
562
        return null;
×
563
      }
564
      @SuppressWarnings("unchecked")
565
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(watcherType.typeEnum);
1✔
566
      usedTypeWatchers.watchers.put(name, watcher);
1✔
567
      return watcher;
1✔
568
    }
569

570
    /** Shut down unused watchers. */
571
    public void closeUnusedWatchers() {
572
      boolean changed = false; // Help out the GC by preferring old objects
1✔
573
      for (TrackedWatcherTypeEnum key : resourceWatchers.keySet()) {
1✔
574
        TypeWatchers<?> orig = resourceWatchers.get(key);
1✔
575
        TypeWatchers<?> used = usedWatchers.get(key);
1✔
576
        for (String name : orig.watchers.keySet()) {
1✔
577
          if (used.watchers.containsKey(name)) {
1✔
578
            continue;
1✔
579
          }
580
          orig.watchers.get(name).close();
1✔
581
          changed = true;
1✔
582
        }
1✔
583
      }
1✔
584
      if (changed) {
1✔
585
        resourceWatchers.putAll(usedWatchers);
1✔
586
      }
587
    }
1✔
588
  }
589

590
  @SuppressWarnings("UnusedTypeParameter")
591
  private static final class TrackedWatcherType<T> {
592
    public final TrackedWatcherTypeEnum typeEnum;
593

594
    public TrackedWatcherType(TrackedWatcherTypeEnum typeEnum) {
1✔
595
      this.typeEnum = checkNotNull(typeEnum, "typeEnum");
1✔
596
    }
1✔
597
  }
598

599
  private interface TrackedWatcher<T> {
600
    @Nullable
601
    StatusOr<T> getData();
602

603
    default boolean missingResult() {
604
      return getData() == null;
1✔
605
    }
606

607
    default boolean hasDataValue() {
608
      StatusOr<T> data = getData();
1✔
609
      return data != null && data.hasValue();
1✔
610
    }
611

612
    void close();
613
  }
614

615
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
616
      implements ResourceWatcher<T>, TrackedWatcher<T> {
617
    private final XdsResourceType<T> type;
618
    private final String resourceName;
619
    boolean cancelled;
620

621
    @Nullable
622
    private StatusOr<T> data;
623

624

625
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
626
      this.type = checkNotNull(type, "type");
1✔
627
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
628
    }
1✔
629

630
    @Override
631
    public void onError(Status error) {
632
      checkNotNull(error, "error");
1✔
633
      if (cancelled) {
1✔
634
        return;
×
635
      }
636
      // Don't update configuration on error, if we've already received configuration
637
      if (!hasDataValue()) {
1✔
638
        this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
639
            String.format("Error retrieving %s: %s: %s",
1✔
640
              toContextString(), error.getCode(), error.getDescription())));
1✔
641
        maybePublishConfig();
1✔
642
      }
643
    }
1✔
644

645
    @Override
646
    public void onResourceDoesNotExist(String resourceName) {
647
      if (cancelled) {
1✔
648
        return;
×
649
      }
650

651
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
652
      this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
653
          toContextString() + " does not exist" + nodeInfo()));
1✔
654
      maybePublishConfig();
1✔
655
    }
1✔
656

657
    @Override
658
    public void onChanged(T update) {
659
      checkNotNull(update, "update");
1✔
660
      if (cancelled) {
1✔
661
        return;
1✔
662
      }
663

664
      this.data = StatusOr.fromValue(update);
1✔
665
      subscribeToChildren(update);
1✔
666
      maybePublishConfig();
1✔
667
    }
1✔
668

669
    protected abstract void subscribeToChildren(T update);
670

671
    @Override
672
    public void close() {
673
      cancelled = true;
1✔
674
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
675
    }
1✔
676

677
    @Override
678
    @Nullable
679
    public StatusOr<T> getData() {
680
      return data;
1✔
681
    }
682

683
    public String toContextString() {
684
      return toContextStr(type.typeName(), resourceName);
1✔
685
    }
686
  }
687

688
  private interface RdsUpdateSupplier {
689
    StatusOr<RdsUpdate> getRdsUpdate();
690
  }
691

692
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
693
      implements RdsUpdateSupplier {
694

695
    private LdsWatcher(String resourceName) {
1✔
696
      super(XdsListenerResource.getInstance(), resourceName);
1✔
697
    }
1✔
698

699
    @Override
700
    public void subscribeToChildren(XdsListenerResource.LdsUpdate update) {
701
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
702
      List<VirtualHost> virtualHosts;
703
      if (httpConnectionManager == null) {
1✔
704
        // TCP listener. Unsupported config
705
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
706
      } else {
707
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
708
      }
709
      if (virtualHosts != null) {
1✔
710
        updateRoutes(virtualHosts);
1✔
711
      }
712

713
      String rdsName = getRdsName(update);
1✔
714
      if (rdsName != null) {
1✔
715
        addRdsWatcher(rdsName);
1✔
716
      }
717
    }
1✔
718

719
    private String getRdsName(XdsListenerResource.LdsUpdate update) {
720
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
721
      if (httpConnectionManager == null) {
1✔
722
        // TCP listener. Unsupported config
723
        return null;
1✔
724
      }
725
      return httpConnectionManager.rdsName();
1✔
726
    }
727

728
    private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTracer tracer) {
729
      String rdsName = getRdsName(update);
1✔
730
      if (rdsName == null) {
1✔
731
        return null;
×
732
      }
733
      return (RdsWatcher) tracer.getWatcher(RDS_TYPE, rdsName);
1✔
734
    }
735

736
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
737
      if (!hasDataValue()) {
1✔
738
        return this;
×
739
      }
740
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
741
      if (hcm == null) {
1✔
742
        return this;
1✔
743
      }
744
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
745
      if (virtualHosts != null) {
1✔
746
        return this;
1✔
747
      }
748
      RdsWatcher rdsWatcher = getRdsWatcher(getData().getValue(), tracer);
1✔
749
      assert rdsWatcher != null;
1✔
750
      return rdsWatcher;
1✔
751
    }
752

753
    @Override
754
    public StatusOr<RdsUpdate> getRdsUpdate() {
755
      if (missingResult()) {
1✔
756
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
757
      }
758
      if (!getData().hasValue()) {
1✔
759
        return StatusOr.fromStatus(getData().getStatus());
×
760
      }
761
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
762
      if (hcm == null) {
1✔
763
        return StatusOr.fromStatus(
1✔
764
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
765
      }
766
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
767
      if (virtualHosts == null) {
1✔
768
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
769
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
770
        // bug
771
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
772
      }
773
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
774
    }
775
  }
776

777
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
778

779
    public RdsWatcher(String resourceName) {
1✔
780
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
781
    }
1✔
782

783
    @Override
784
    public void subscribeToChildren(RdsUpdate update) {
785
      updateRoutes(update.virtualHosts);
1✔
786
    }
1✔
787

788
    @Override
789
    public StatusOr<RdsUpdate> getRdsUpdate() {
790
      if (missingResult()) {
1✔
791
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
792
      }
793
      return getData();
1✔
794
    }
795
  }
796

797
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
798
    CdsWatcher(String resourceName) {
1✔
799
      super(XdsClusterResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
800
    }
1✔
801

802
    @Override
803
    public void subscribeToChildren(XdsClusterResource.CdsUpdate update) {
804
      switch (update.clusterType()) {
1✔
805
        case EDS:
806
          addEdsWatcher(getEdsServiceName());
1✔
807
          break;
1✔
808
        case LOGICAL_DNS:
809
          addDnsWatcher(update.dnsHostName());
1✔
810
          break;
1✔
811
        case AGGREGATE:
812
          update.prioritizedClusterNames()
1✔
813
              .forEach(name -> addClusterWatcher(name));
1✔
814
          break;
1✔
815
        default:
816
      }
817
    }
1✔
818

819
    public String getEdsServiceName() {
820
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
821
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
822
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
823
      if (edsServiceName == null) {
1✔
824
        edsServiceName = cdsUpdate.clusterName();
×
825
      }
826
      return edsServiceName;
1✔
827
    }
828
  }
829

830
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
831
    private EdsWatcher(String resourceName) {
1✔
832
      super(XdsEndpointResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
833
    }
1✔
834

835
    @Override
836
    public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
1✔
837
  }
838

839
  private final class DnsWatcher implements TrackedWatcher<List<EquivalentAddressGroup>> {
840
    private final NameResolver resolver;
841
    @Nullable
842
    private StatusOr<List<EquivalentAddressGroup>> data;
843
    private boolean cancelled;
844

845
    public DnsWatcher(String dnsHostName, NameResolver.Args nameResolverArgs) {
1✔
846
      this.resolver = createNameResolver(dnsHostName, nameResolverArgs);
1✔
847
    }
1✔
848

849
    public void start() {
850
      resolver.start(new NameResolverListener());
1✔
851
    }
1✔
852

853
    public void refresh() {
854
      if (cancelled) {
×
855
        return;
×
856
      }
857
      resolver.refresh();
×
858
    }
×
859

860
    @Override
861
    @Nullable
862
    public StatusOr<List<EquivalentAddressGroup>> getData() {
863
      return data;
1✔
864
    }
865

866
    @Override
867
    public void close() {
868
      if (cancelled) {
1✔
869
        return;
×
870
      }
871
      cancelled = true;
1✔
872
      resolver.shutdown();
1✔
873
    }
1✔
874

875
    private class NameResolverListener extends NameResolver.Listener2 {
1✔
876
      @Override
877
      public void onResult(final NameResolver.ResolutionResult resolutionResult) {
878
        syncContext.execute(() -> onResult2(resolutionResult));
×
879
      }
×
880

881
      @Override
882
      public Status onResult2(final NameResolver.ResolutionResult resolutionResult) {
883
        if (cancelled) {
1✔
884
          return Status.OK;
×
885
        }
886
        data = resolutionResult.getAddressesOrError();
1✔
887
        maybePublishConfig();
1✔
888
        return resolutionResult.getAddressesOrError().getStatus();
1✔
889
      }
890

891
      @Override
892
      public void onError(final Status error) {
893
        syncContext.execute(new Runnable() {
1✔
894
          @Override
895
          public void run() {
896
            if (cancelled) {
1✔
897
              return;
×
898
            }
899
            // DnsNameResolver cannot distinguish between address-not-found and transient errors.
900
            // Assume it is a transient error.
901
            // TODO: Once the resolution note API is available, don't throw away the error if
902
            // hasDataValue(); pass it as the note instead
903
            if (!hasDataValue()) {
1✔
904
              data = StatusOr.fromStatus(error);
1✔
905
              maybePublishConfig();
1✔
906
            }
907
          }
1✔
908
        });
909
      }
1✔
910
    }
911
  }
912

913
  private static final class FailingNameResolver extends NameResolver {
914
    private final Status status;
915

916
    public FailingNameResolver(Status status) {
1✔
917
      checkNotNull(status, "status");
1✔
918
      checkArgument(!status.isOk(), "Status must not be OK");
1✔
919
      this.status = status;
1✔
920
    }
1✔
921

922
    @Override
923
    public void start(Listener2 listener) {
924
      listener.onError(status);
1✔
925
    }
1✔
926

927
    @Override
928
    public String getServiceAuthority() {
929
      return "bug-if-you-see-this-authority";
×
930
    }
931

932
    @Override
933
    public void shutdown() {}
1✔
934
  }
935
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc