• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19889

24 Jun 2025 02:56PM UTC coverage: 88.521% (-0.006%) from 88.527%
#19889

push

github

ejona86
xds: Disable LOGICAL_DNS in XdsDepMan until used

ClusterResolverLb is still doing DNS itself, so disable it in XdsDepMan
until that migration has finished. EDS is fine in XdsDepman, because
XdsClient will share the result with ClusterResolverLb.

34633 of 39124 relevant lines covered (88.52%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.29
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.ImmutableMap;
27
import io.grpc.EquivalentAddressGroup;
28
import io.grpc.NameResolver;
29
import io.grpc.NameResolverProvider;
30
import io.grpc.Status;
31
import io.grpc.StatusOr;
32
import io.grpc.SynchronizationContext;
33
import io.grpc.internal.RetryingNameResolver;
34
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
35
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
36
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
37
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
38
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
39
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
40
import io.grpc.xds.client.Locality;
41
import io.grpc.xds.client.XdsClient;
42
import io.grpc.xds.client.XdsClient.ResourceWatcher;
43
import io.grpc.xds.client.XdsResourceType;
44
import java.net.URI;
45
import java.net.URISyntaxException;
46
import java.util.ArrayList;
47
import java.util.Collections;
48
import java.util.EnumMap;
49
import java.util.HashMap;
50
import java.util.HashSet;
51
import java.util.LinkedHashSet;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.Objects;
55
import java.util.Set;
56
import javax.annotation.Nullable;
57

58
/**
59
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
60
 * maintains the watchers for the xds resources and when an update is received, it either requests
61
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
62
 * applies to a single data plane authority.
63
 */
64
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
65
  private enum TrackedWatcherTypeEnum {
1✔
66
    LDS, RDS, CDS, EDS, DNS
1✔
67
  }
68

69
  private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
1✔
70
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.LDS);
71
  private static final TrackedWatcherType<RdsUpdate> RDS_TYPE =
1✔
72
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.RDS);
73
  private static final TrackedWatcherType<XdsClusterResource.CdsUpdate> CDS_TYPE =
1✔
74
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
75
  private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
1✔
76
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
77
  private static final TrackedWatcherType<List<EquivalentAddressGroup>> DNS_TYPE =
1✔
78
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.DNS);
79

80
  // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
81
  // to an empty locality.
82
  private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
1✔
83

84
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
85

86
  static boolean enableLogicalDns = false;
1✔
87

88
  private final String listenerName;
89
  private final XdsClient xdsClient;
90
  private final SynchronizationContext syncContext;
91
  private final String dataPlaneAuthority;
92
  private final NameResolver.Args nameResolverArgs;
93
  private XdsConfigWatcher xdsConfigWatcher;
94

95
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
96
  private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers =
1✔
97
      new EnumMap<>(TrackedWatcherTypeEnum.class);
98
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
99

100
  XdsDependencyManager(
101
      XdsClient xdsClient,
102
      SynchronizationContext syncContext,
103
      String dataPlaneAuthority,
104
      String listenerName,
105
      NameResolver.Args nameResolverArgs) {
1✔
106
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
107
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
108
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
109
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
110
    this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
111
  }
1✔
112

113
  public static String toContextStr(String typeName, String resourceName) {
114
    return typeName + " resource " + resourceName;
1✔
115
  }
116

117
  public void start(XdsConfigWatcher xdsConfigWatcher) {
118
    checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
1✔
119
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
120
    // start the ball rolling
121
    syncContext.execute(() -> addWatcher(LDS_TYPE, new LdsWatcher(listenerName)));
1✔
122
  }
1✔
123

124
  @Override
125
  public XdsConfig.Subscription subscribeToCluster(String clusterName) {
126
    checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
1✔
127
    checkNotNull(clusterName, "clusterName");
1✔
128
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
129

130
    syncContext.execute(() -> {
1✔
131
      if (getWatchers(LDS_TYPE).isEmpty()) {
1✔
132
        subscription.closed = true;
1✔
133
        return; // shutdown() called
1✔
134
      }
135
      subscriptions.add(subscription);
1✔
136
      addClusterWatcher(clusterName);
1✔
137
    });
1✔
138

139
    return subscription;
1✔
140
  }
141

142
  /**
143
   * For all logical dns clusters refresh their results.
144
   */
145
  public void requestReresolution() {
146
    syncContext.execute(() -> {
×
147
      for (TrackedWatcher<List<EquivalentAddressGroup>> watcher : getWatchers(DNS_TYPE).values()) {
×
148
        DnsWatcher dnsWatcher = (DnsWatcher) watcher;
×
149
        dnsWatcher.refresh();
×
150
      }
×
151
    });
×
152
  }
×
153

154
  private <T extends ResourceUpdate> void addWatcher(
155
      TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
156
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
157
    XdsResourceType<T> type = watcher.type;
1✔
158
    String resourceName = watcher.resourceName;
1✔
159

160
    getWatchers(watcherType).put(resourceName, watcher);
1✔
161
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
162
  }
1✔
163

164
  public void shutdown() {
165
    syncContext.execute(() -> {
1✔
166
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
167
        for (TrackedWatcher<?> watcher : watchers.watchers.values()) {
1✔
168
          watcher.close();
1✔
169
        }
1✔
170
      }
1✔
171
      resourceWatchers.clear();
1✔
172
      subscriptions.clear();
1✔
173
    });
1✔
174
  }
1✔
175

176
  private void releaseSubscription(ClusterSubscription subscription) {
177
    checkNotNull(subscription, "subscription");
1✔
178
    syncContext.execute(() -> {
1✔
179
      if (subscription.closed) {
1✔
180
        return;
1✔
181
      }
182
      subscription.closed = true;
1✔
183
      if (!subscriptions.remove(subscription)) {
1✔
184
        return; // shutdown() called
×
185
      }
186
      maybePublishConfig();
1✔
187
    });
1✔
188
  }
1✔
189

190
  /**
191
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
192
   * the watchers.
193
   */
194
  private void maybePublishConfig() {
195
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
196
    if (getWatchers(LDS_TYPE).isEmpty()) {
1✔
197
      return; // shutdown() called
×
198
    }
199
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
200
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
201
        .anyMatch(TrackedWatcher::missingResult);
1✔
202
    if (waitingOnResource) {
1✔
203
      return;
1✔
204
    }
205

206
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
207
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
208
      return;
1✔
209
    }
210
    assert newUpdate.hasValue()
1✔
211
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
212
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
213
    lastUpdate = newUpdate;
1✔
214
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
215
  }
1✔
216

217
  @VisibleForTesting
218
  StatusOr<XdsConfig> buildUpdate() {
219
    // Create a config and discard any watchers not accessed
220
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
221
    StatusOr<XdsConfig> config = buildUpdate(
1✔
222
        tracer, listenerName, dataPlaneAuthority, subscriptions);
223
    tracer.closeUnusedWatchers();
1✔
224
    return config;
1✔
225
  }
226

227
  private static StatusOr<XdsConfig> buildUpdate(
228
      WatcherTracer tracer,
229
      String listenerName,
230
      String dataPlaneAuthority,
231
      Set<ClusterSubscription> subscriptions) {
232
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
233

234
    // Iterate watchers and build the XdsConfig
235

236
    TrackedWatcher<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
237
        = tracer.getWatcher(LDS_TYPE, listenerName);
1✔
238
    if (ldsWatcher == null) {
1✔
239
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
240
          "Bug: No listener watcher found for " + listenerName));
241
    }
242
    if (!ldsWatcher.getData().hasValue()) {
1✔
243
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
244
    }
245
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
246
    builder.setListener(ldsUpdate);
1✔
247

248
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
249
    if (routeSource == null) {
1✔
250
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
251
          "Bug: No route source found for listener " + dataPlaneAuthority));
252
    }
253
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
254
    if (!statusOrRdsUpdate.hasValue()) {
1✔
255
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
256
    }
257
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
258
    builder.setRoute(rdsUpdate);
1✔
259

260
    VirtualHost activeVirtualHost =
1✔
261
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
262
    if (activeVirtualHost == null) {
1✔
263
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
264
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
265
    }
266
    builder.setVirtualHost(activeVirtualHost);
1✔
267

268
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
269
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
270
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
271
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
272
    }
1✔
273
    for (ClusterSubscription subscription : subscriptions) {
1✔
274
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
275
    }
1✔
276
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
277
      builder.addCluster(me.getKey(), me.getValue());
1✔
278
    }
1✔
279

280
    return StatusOr.fromValue(builder.build());
1✔
281
  }
282

283
  private <T> Map<String, TrackedWatcher<T>> getWatchers(TrackedWatcherType<T> watcherType) {
284
    TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
1✔
285
    if (typeWatchers == null) {
1✔
286
      typeWatchers = new TypeWatchers<T>(watcherType);
1✔
287
      resourceWatchers.put(watcherType.typeEnum, typeWatchers);
1✔
288
    }
289
    assert typeWatchers.watcherType == watcherType;
1✔
290
    @SuppressWarnings("unchecked")
291
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
292
    return tTypeWatchers.watchers;
1✔
293
  }
294

295
  private static void addConfigForCluster(
296
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
297
      String clusterName,
298
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
299
      LinkedHashSet<String> ancestors,
300
      WatcherTracer tracer) {
301
    if (clusters.containsKey(clusterName)) {
1✔
302
      return;
1✔
303
    }
304
    if (ancestors.contains(clusterName)) {
1✔
305
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
306
          Status.INTERNAL.withDescription(
1✔
307
              "Aggregate cluster cycle detected: " + ancestors)));
308
      return;
1✔
309
    }
310
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
311
      clusters.put(clusterName, StatusOr.fromStatus(
×
312
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
313
      return;
×
314
    }
315

316
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CDS_TYPE, clusterName);
1✔
317
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
318
    if (!cdsWatcherDataOr.hasValue()) {
1✔
319
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
320
      return;
1✔
321
    }
322

323
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
324
    XdsConfig.XdsClusterConfig.ClusterChild child;
325
    switch (cdsUpdate.clusterType()) {
1✔
326
      case AGGREGATE:
327
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
328
        // preserves the priority across all aggregate clusters
329
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
330
        ancestors.add(clusterName);
1✔
331
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
332
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
333
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
334
          if (!config.hasValue()) {
1✔
335
            // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
336
            // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
337
            // watchers reports a transient ADS stream error, the policy should report that it is in
338
            // TRANSIENT_FAILURE if it has never passed a config to its child.
339
            //
340
            // But there's currently disagreement about whether that is actually what we want, and
341
            // that was not originally implemented in gRPC Java. So we're keeping Java's old
342
            // behavior for now and only failing the "leaves" (which is a bit arbitrary for a
343
            // cycle).
344
            leafNames.add(childCluster);
1✔
345
            continue;
1✔
346
          }
347
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
348
          if (children instanceof AggregateConfig) {
1✔
349
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
350
          } else {
351
            leafNames.add(childCluster);
1✔
352
          }
353
        }
1✔
354
        ancestors.remove(clusterName);
1✔
355

356
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
357
        break;
1✔
358
      case EDS:
359
        TrackedWatcher<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
360
            tracer.getWatcher(EDS_TYPE, cdsWatcher.getEdsServiceName());
1✔
361
        if (edsWatcher != null) {
1✔
362
          child = new EndpointConfig(edsWatcher.getData());
1✔
363
        } else {
364
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
365
              "EDS resource not found for cluster " + clusterName)));
366
        }
367
        break;
×
368
      case LOGICAL_DNS:
369
        if (enableLogicalDns) {
1✔
370
          TrackedWatcher<List<EquivalentAddressGroup>> dnsWatcher =
1✔
371
              tracer.getWatcher(DNS_TYPE, cdsUpdate.dnsHostName());
1✔
372
          child = new EndpointConfig(dnsToEdsUpdate(dnsWatcher.getData(), cdsUpdate.dnsHostName()));
1✔
373
        } else {
1✔
374
          child = new EndpointConfig(StatusOr.fromStatus(
1✔
375
              Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
376
        }
377
        break;
1✔
378
      default:
379
        child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
380
              "Unknown type in cluster " + clusterName + " " + cdsUpdate.clusterType())));
×
381
    }
382
    if (clusters.containsKey(clusterName)) {
1✔
383
      // If a cycle is detected, we'll have detected it while recursing, so now there will be a key
384
      // present. We don't want to overwrite it with a non-error value.
385
      return;
1✔
386
    }
387
    clusters.put(clusterName, StatusOr.fromValue(
1✔
388
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
389
  }
1✔
390

391
  private static StatusOr<XdsEndpointResource.EdsUpdate> dnsToEdsUpdate(
392
      StatusOr<List<EquivalentAddressGroup>> dnsData, String dnsHostName) {
393
    if (!dnsData.hasValue()) {
1✔
394
      return StatusOr.fromStatus(dnsData.getStatus());
1✔
395
    }
396

397
    List<Endpoints.LbEndpoint> endpoints = new ArrayList<>();
1✔
398
    for (EquivalentAddressGroup eag : dnsData.getValue()) {
1✔
399
      endpoints.add(Endpoints.LbEndpoint.create(eag, 1, true, dnsHostName, ImmutableMap.of()));
1✔
400
    }
1✔
401
    LocalityLbEndpoints lbEndpoints =
1✔
402
        LocalityLbEndpoints.create(endpoints, 1, 0, ImmutableMap.of());
1✔
403
    return StatusOr.fromValue(new XdsEndpointResource.EdsUpdate(
1✔
404
        "fakeEds_logicalDns",
405
        Collections.singletonMap(LOGICAL_DNS_CLUSTER_LOCALITY, lbEndpoints),
1✔
406
        new ArrayList<>()));
407
  }
408

409
  private void addRdsWatcher(String resourceName) {
410
    if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
1✔
411
      return;
×
412
    }
413

414
    addWatcher(RDS_TYPE, new RdsWatcher(resourceName));
1✔
415
  }
1✔
416

417
  private void addEdsWatcher(String edsServiceName) {
418
    if (getWatchers(EDS_TYPE).containsKey(edsServiceName)) {
1✔
419
      return;
1✔
420
    }
421

422
    addWatcher(EDS_TYPE, new EdsWatcher(edsServiceName));
1✔
423
  }
1✔
424

425
  private void addClusterWatcher(String clusterName) {
426
    if (getWatchers(CDS_TYPE).containsKey(clusterName)) {
1✔
427
      return;
1✔
428
    }
429

430
    addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
1✔
431
  }
1✔
432

433
  private void addDnsWatcher(String dnsHostName) {
434
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
435
    if (getWatchers(DNS_TYPE).containsKey(dnsHostName)) {
1✔
436
      return;
×
437
    }
438

439
    DnsWatcher watcher = new DnsWatcher(dnsHostName, nameResolverArgs);
1✔
440
    getWatchers(DNS_TYPE).put(dnsHostName, watcher);
1✔
441
    watcher.start();
1✔
442
  }
1✔
443

444
  private void updateRoutes(List<VirtualHost> virtualHosts) {
445
    VirtualHost virtualHost =
1✔
446
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
447
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
448
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
449
  }
1✔
450

451
  private String nodeInfo() {
452
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
453
  }
454

455
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
456
    if (virtualHost == null) {
1✔
457
      return Collections.emptySet();
1✔
458
    }
459

460
    // Get all cluster names to which requests can be routed through the virtual host.
461
    Set<String> clusters = new HashSet<>();
1✔
462
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
463
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
464
      if (action == null) {
1✔
465
        continue;
1✔
466
      }
467
      if (action.cluster() != null) {
1✔
468
        clusters.add(action.cluster());
1✔
469
      } else if (action.weightedClusters() != null) {
1✔
470
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
471
          clusters.add(weighedCluster.name());
1✔
472
        }
1✔
473
      }
474
    }
1✔
475

476
    return clusters;
1✔
477
  }
478

479
  private static NameResolver createNameResolver(
480
      String dnsHostName,
481
      NameResolver.Args nameResolverArgs) {
482
    URI uri;
483
    try {
484
      uri = new URI("dns", "", "/" + dnsHostName, null);
1✔
485
    } catch (URISyntaxException e) {
×
486
      return new FailingNameResolver(
×
487
          Status.INTERNAL.withDescription("Bug, invalid URI creation: " + dnsHostName)
×
488
            .withCause(e));
×
489
    }
1✔
490

491
    NameResolverProvider provider =
1✔
492
        nameResolverArgs.getNameResolverRegistry().getProviderForScheme("dns");
1✔
493
    if (provider == null) {
1✔
494
      return new FailingNameResolver(
1✔
495
          Status.INTERNAL.withDescription("Could not find dns name resolver"));
1✔
496
    }
497

498
    NameResolver bareResolver = provider.newNameResolver(uri, nameResolverArgs);
1✔
499
    if (bareResolver == null) {
1✔
500
      return new FailingNameResolver(
×
501
          Status.INTERNAL.withDescription("DNS name resolver provider returned null: " + uri));
×
502
    }
503
    return RetryingNameResolver.wrap(bareResolver, nameResolverArgs);
1✔
504
  }
505

506
  private static class TypeWatchers<T> {
507
    // Key is resource name
508
    final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
1✔
509
    final TrackedWatcherType<T> watcherType;
510

511
    TypeWatchers(TrackedWatcherType<T> watcherType) {
1✔
512
      this.watcherType = checkNotNull(watcherType, "watcherType");
1✔
513
    }
1✔
514
  }
515

516
  public interface XdsConfigWatcher {
517
    /**
518
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
519
     * INTERNAL.
520
     */
521
    void onUpdate(StatusOr<XdsConfig> config);
522
  }
523

524
  private final class ClusterSubscription implements XdsConfig.Subscription {
525
    private final String clusterName;
526
    boolean closed; // Accessed from syncContext
527

528
    public ClusterSubscription(String clusterName) {
1✔
529
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
530
    }
1✔
531

532
    String getClusterName() {
533
      return clusterName;
1✔
534
    }
535

536
    @Override
537
    public void close() {
538
      releaseSubscription(this);
1✔
539
    }
1✔
540
  }
541

542
  /** State for tracing garbage collector. */
543
  private static final class WatcherTracer {
1✔
544
    private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers;
545
    private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> usedWatchers;
546

547
    public WatcherTracer(Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers) {
1✔
548
      this.resourceWatchers = resourceWatchers;
1✔
549

550
      this.usedWatchers = new EnumMap<>(TrackedWatcherTypeEnum.class);
1✔
551
      for (Map.Entry<TrackedWatcherTypeEnum, TypeWatchers<?>> me : resourceWatchers.entrySet()) {
1✔
552
        usedWatchers.put(me.getKey(), newTypeWatchers(me.getValue().watcherType));
1✔
553
      }
1✔
554
    }
1✔
555

556
    private static <T> TypeWatchers<T> newTypeWatchers(TrackedWatcherType<T> type) {
557
      return new TypeWatchers<T>(type);
1✔
558
    }
559

560
    public <T> TrackedWatcher<T> getWatcher(TrackedWatcherType<T> watcherType, String name) {
561
      TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
1✔
562
      if (typeWatchers == null) {
1✔
563
        return null;
×
564
      }
565
      assert typeWatchers.watcherType == watcherType;
1✔
566
      @SuppressWarnings("unchecked")
567
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
568
      TrackedWatcher<T> watcher = tTypeWatchers.watchers.get(name);
1✔
569
      if (watcher == null) {
1✔
570
        return null;
×
571
      }
572
      @SuppressWarnings("unchecked")
573
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(watcherType.typeEnum);
1✔
574
      usedTypeWatchers.watchers.put(name, watcher);
1✔
575
      return watcher;
1✔
576
    }
577

578
    /** Shut down unused watchers. */
579
    public void closeUnusedWatchers() {
580
      boolean changed = false; // Help out the GC by preferring old objects
1✔
581
      for (TrackedWatcherTypeEnum key : resourceWatchers.keySet()) {
1✔
582
        TypeWatchers<?> orig = resourceWatchers.get(key);
1✔
583
        TypeWatchers<?> used = usedWatchers.get(key);
1✔
584
        for (String name : orig.watchers.keySet()) {
1✔
585
          if (used.watchers.containsKey(name)) {
1✔
586
            continue;
1✔
587
          }
588
          orig.watchers.get(name).close();
1✔
589
          changed = true;
1✔
590
        }
1✔
591
      }
1✔
592
      if (changed) {
1✔
593
        resourceWatchers.putAll(usedWatchers);
1✔
594
      }
595
    }
1✔
596
  }
597

598
  @SuppressWarnings("UnusedTypeParameter")
599
  private static final class TrackedWatcherType<T> {
600
    public final TrackedWatcherTypeEnum typeEnum;
601

602
    public TrackedWatcherType(TrackedWatcherTypeEnum typeEnum) {
1✔
603
      this.typeEnum = checkNotNull(typeEnum, "typeEnum");
1✔
604
    }
1✔
605
  }
606

607
  private interface TrackedWatcher<T> {
608
    @Nullable
609
    StatusOr<T> getData();
610

611
    default boolean missingResult() {
612
      return getData() == null;
1✔
613
    }
614

615
    default boolean hasDataValue() {
616
      StatusOr<T> data = getData();
1✔
617
      return data != null && data.hasValue();
1✔
618
    }
619

620
    void close();
621
  }
622

623
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
624
      implements ResourceWatcher<T>, TrackedWatcher<T> {
625
    private final XdsResourceType<T> type;
626
    private final String resourceName;
627
    boolean cancelled;
628

629
    @Nullable
630
    private StatusOr<T> data;
631

632

633
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
634
      this.type = checkNotNull(type, "type");
1✔
635
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
636
    }
1✔
637

638
    @Override
639
    public void onError(Status error) {
640
      checkNotNull(error, "error");
1✔
641
      if (cancelled) {
1✔
642
        return;
×
643
      }
644
      // Don't update configuration on error, if we've already received configuration
645
      if (!hasDataValue()) {
1✔
646
        this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
647
            String.format("Error retrieving %s: %s: %s",
1✔
648
              toContextString(), error.getCode(), error.getDescription())));
1✔
649
        maybePublishConfig();
1✔
650
      }
651
    }
1✔
652

653
    @Override
654
    public void onResourceDoesNotExist(String resourceName) {
655
      if (cancelled) {
1✔
656
        return;
×
657
      }
658

659
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
660
      this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
661
          toContextString() + " does not exist" + nodeInfo()));
1✔
662
      maybePublishConfig();
1✔
663
    }
1✔
664

665
    @Override
666
    public void onChanged(T update) {
667
      checkNotNull(update, "update");
1✔
668
      if (cancelled) {
1✔
669
        return;
1✔
670
      }
671

672
      this.data = StatusOr.fromValue(update);
1✔
673
      subscribeToChildren(update);
1✔
674
      maybePublishConfig();
1✔
675
    }
1✔
676

677
    protected abstract void subscribeToChildren(T update);
678

679
    @Override
680
    public void close() {
681
      cancelled = true;
1✔
682
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
683
    }
1✔
684

685
    @Override
686
    @Nullable
687
    public StatusOr<T> getData() {
688
      return data;
1✔
689
    }
690

691
    public String toContextString() {
692
      return toContextStr(type.typeName(), resourceName);
1✔
693
    }
694
  }
695

696
  private interface RdsUpdateSupplier {
697
    StatusOr<RdsUpdate> getRdsUpdate();
698
  }
699

700
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
701
      implements RdsUpdateSupplier {
702

703
    private LdsWatcher(String resourceName) {
1✔
704
      super(XdsListenerResource.getInstance(), resourceName);
1✔
705
    }
1✔
706

707
    @Override
708
    public void subscribeToChildren(XdsListenerResource.LdsUpdate update) {
709
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
710
      List<VirtualHost> virtualHosts;
711
      if (httpConnectionManager == null) {
1✔
712
        // TCP listener. Unsupported config
713
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
714
      } else {
715
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
716
      }
717
      if (virtualHosts != null) {
1✔
718
        updateRoutes(virtualHosts);
1✔
719
      }
720

721
      String rdsName = getRdsName(update);
1✔
722
      if (rdsName != null) {
1✔
723
        addRdsWatcher(rdsName);
1✔
724
      }
725
    }
1✔
726

727
    private String getRdsName(XdsListenerResource.LdsUpdate update) {
728
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
729
      if (httpConnectionManager == null) {
1✔
730
        // TCP listener. Unsupported config
731
        return null;
1✔
732
      }
733
      return httpConnectionManager.rdsName();
1✔
734
    }
735

736
    private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTracer tracer) {
737
      String rdsName = getRdsName(update);
1✔
738
      if (rdsName == null) {
1✔
739
        return null;
×
740
      }
741
      return (RdsWatcher) tracer.getWatcher(RDS_TYPE, rdsName);
1✔
742
    }
743

744
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
745
      if (!hasDataValue()) {
1✔
746
        return this;
×
747
      }
748
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
749
      if (hcm == null) {
1✔
750
        return this;
1✔
751
      }
752
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
753
      if (virtualHosts != null) {
1✔
754
        return this;
1✔
755
      }
756
      RdsWatcher rdsWatcher = getRdsWatcher(getData().getValue(), tracer);
1✔
757
      assert rdsWatcher != null;
1✔
758
      return rdsWatcher;
1✔
759
    }
760

761
    @Override
762
    public StatusOr<RdsUpdate> getRdsUpdate() {
763
      if (missingResult()) {
1✔
764
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
765
      }
766
      if (!getData().hasValue()) {
1✔
767
        return StatusOr.fromStatus(getData().getStatus());
×
768
      }
769
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
770
      if (hcm == null) {
1✔
771
        return StatusOr.fromStatus(
1✔
772
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
773
      }
774
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
775
      if (virtualHosts == null) {
1✔
776
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
777
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
778
        // bug
779
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
780
      }
781
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
782
    }
783
  }
784

785
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
786

787
    public RdsWatcher(String resourceName) {
1✔
788
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
789
    }
1✔
790

791
    @Override
792
    public void subscribeToChildren(RdsUpdate update) {
793
      updateRoutes(update.virtualHosts);
1✔
794
    }
1✔
795

796
    @Override
797
    public StatusOr<RdsUpdate> getRdsUpdate() {
798
      if (missingResult()) {
1✔
799
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
800
      }
801
      return getData();
1✔
802
    }
803
  }
804

805
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
806
    CdsWatcher(String resourceName) {
1✔
807
      super(XdsClusterResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
808
    }
1✔
809

810
    @Override
811
    public void subscribeToChildren(XdsClusterResource.CdsUpdate update) {
812
      switch (update.clusterType()) {
1✔
813
        case EDS:
814
          addEdsWatcher(getEdsServiceName());
1✔
815
          break;
1✔
816
        case LOGICAL_DNS:
817
          if (enableLogicalDns) {
1✔
818
            addDnsWatcher(update.dnsHostName());
1✔
819
          }
820
          break;
821
        case AGGREGATE:
822
          update.prioritizedClusterNames()
1✔
823
              .forEach(name -> addClusterWatcher(name));
1✔
824
          break;
1✔
825
        default:
826
      }
827
    }
1✔
828

829
    public String getEdsServiceName() {
830
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
831
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
832
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
833
      if (edsServiceName == null) {
1✔
834
        edsServiceName = cdsUpdate.clusterName();
×
835
      }
836
      return edsServiceName;
1✔
837
    }
838
  }
839

840
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
841
    private EdsWatcher(String resourceName) {
1✔
842
      super(XdsEndpointResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
843
    }
1✔
844

845
    @Override
846
    public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
1✔
847
  }
848

849
  private final class DnsWatcher implements TrackedWatcher<List<EquivalentAddressGroup>> {
850
    private final NameResolver resolver;
851
    @Nullable
852
    private StatusOr<List<EquivalentAddressGroup>> data;
853
    private boolean cancelled;
854

855
    public DnsWatcher(String dnsHostName, NameResolver.Args nameResolverArgs) {
1✔
856
      this.resolver = createNameResolver(dnsHostName, nameResolverArgs);
1✔
857
    }
1✔
858

859
    public void start() {
860
      resolver.start(new NameResolverListener());
1✔
861
    }
1✔
862

863
    public void refresh() {
864
      if (cancelled) {
×
865
        return;
×
866
      }
867
      resolver.refresh();
×
868
    }
×
869

870
    @Override
871
    @Nullable
872
    public StatusOr<List<EquivalentAddressGroup>> getData() {
873
      return data;
1✔
874
    }
875

876
    @Override
877
    public void close() {
878
      if (cancelled) {
1✔
879
        return;
×
880
      }
881
      cancelled = true;
1✔
882
      resolver.shutdown();
1✔
883
    }
1✔
884

885
    private class NameResolverListener extends NameResolver.Listener2 {
1✔
886
      @Override
887
      public void onResult(final NameResolver.ResolutionResult resolutionResult) {
888
        syncContext.execute(() -> onResult2(resolutionResult));
×
889
      }
×
890

891
      @Override
892
      public Status onResult2(final NameResolver.ResolutionResult resolutionResult) {
893
        if (cancelled) {
1✔
894
          return Status.OK;
×
895
        }
896
        data = resolutionResult.getAddressesOrError();
1✔
897
        maybePublishConfig();
1✔
898
        return resolutionResult.getAddressesOrError().getStatus();
1✔
899
      }
900

901
      @Override
902
      public void onError(final Status error) {
903
        syncContext.execute(new Runnable() {
1✔
904
          @Override
905
          public void run() {
906
            if (cancelled) {
1✔
907
              return;
×
908
            }
909
            // DnsNameResolver cannot distinguish between address-not-found and transient errors.
910
            // Assume it is a transient error.
911
            // TODO: Once the resolution note API is available, don't throw away the error if
912
            // hasDataValue(); pass it as the note instead
913
            if (!hasDataValue()) {
1✔
914
              data = StatusOr.fromStatus(error);
1✔
915
              maybePublishConfig();
1✔
916
            }
917
          }
1✔
918
        });
919
      }
1✔
920
    }
921
  }
922

923
  private static final class FailingNameResolver extends NameResolver {
924
    private final Status status;
925

926
    public FailingNameResolver(Status status) {
1✔
927
      checkNotNull(status, "status");
1✔
928
      checkArgument(!status.isOk(), "Status must not be OK");
1✔
929
      this.status = status;
1✔
930
    }
1✔
931

932
    @Override
933
    public void start(Listener2 listener) {
934
      listener.onError(status);
1✔
935
    }
1✔
936

937
    @Override
938
    public String getServiceAuthority() {
939
      return "bug-if-you-see-this-authority";
×
940
    }
941

942
    @Override
943
    public void shutdown() {}
1✔
944
  }
945
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc