• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20243

16 Apr 2026 05:25PM UTC coverage: 88.811% (+0.01%) from 88.8%
#20243

push

github

ejona86
xds: Propagate status cause through XdsDepManager

Often there is no cause, but connect(), channel credentials, and call
credentials failures on the control plane RPC can include a useful
causal exception.

This was triggered by seeing an error like below, but it didn't include
the cause, which would have included HTTP error information from the
failure fetching the credential.
```
UNAVAILABLE: Error retrieving LDS resource xdstp://traffic-director-c2p.xds.googleapis.com/envoy.config.listener.v3.Listener/bigtable.googleapis.com: UNAUTHENTICATED: Failed computing credential metadata nodeID: C2P-798500073
```

36019 of 40557 relevant lines covered (88.81%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.09
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.ImmutableMap;
27
import io.grpc.EquivalentAddressGroup;
28
import io.grpc.NameResolver;
29
import io.grpc.NameResolverProvider;
30
import io.grpc.Status;
31
import io.grpc.StatusOr;
32
import io.grpc.SynchronizationContext;
33
import io.grpc.internal.GrpcUtil;
34
import io.grpc.internal.RetryingNameResolver;
35
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
36
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
37
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
38
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
39
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
40
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
41
import io.grpc.xds.client.Locality;
42
import io.grpc.xds.client.XdsClient;
43
import io.grpc.xds.client.XdsClient.ResourceWatcher;
44
import io.grpc.xds.client.XdsResourceType;
45
import java.net.SocketAddress;
46
import java.net.URI;
47
import java.net.URISyntaxException;
48
import java.util.ArrayList;
49
import java.util.Collections;
50
import java.util.EnumMap;
51
import java.util.HashMap;
52
import java.util.HashSet;
53
import java.util.LinkedHashSet;
54
import java.util.List;
55
import java.util.Map;
56
import java.util.Objects;
57
import java.util.Set;
58
import javax.annotation.Nullable;
59

60
/**
61
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
62
 * maintains the watchers for the xds resources and when an update is received, it either requests
63
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
64
 * applies to a single data plane authority.
65
 */
66
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
67
  private enum TrackedWatcherTypeEnum {
1✔
68
    LDS, RDS, CDS, EDS, DNS
1✔
69
  }
70

71
  private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
1✔
72
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.LDS);
73
  private static final TrackedWatcherType<RdsUpdate> RDS_TYPE =
1✔
74
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.RDS);
75
  private static final TrackedWatcherType<XdsClusterResource.CdsUpdate> CDS_TYPE =
1✔
76
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
77
  private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
1✔
78
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
79
  private static final TrackedWatcherType<List<EquivalentAddressGroup>> DNS_TYPE =
1✔
80
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.DNS);
81

82
  // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
83
  // to an empty locality.
84
  private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
1✔
85

86
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
87

88
  static boolean enableLogicalDns = true;
1✔
89

90
  private final String listenerName;
91
  private final XdsClient xdsClient;
92
  private final SynchronizationContext syncContext;
93
  private final String dataPlaneAuthority;
94
  private final NameResolver.Args nameResolverArgs;
95
  private XdsConfigWatcher xdsConfigWatcher;
96

97
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
98
  private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers =
1✔
99
      new EnumMap<>(TrackedWatcherTypeEnum.class);
100
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
101

102
  XdsDependencyManager(
103
      XdsClient xdsClient,
104
      SynchronizationContext syncContext,
105
      String dataPlaneAuthority,
106
      String listenerName,
107
      NameResolver.Args nameResolverArgs) {
1✔
108
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
109
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
110
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
111
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
112
    this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
113
  }
1✔
114

115
  public static String toContextStr(String typeName, String resourceName) {
116
    return typeName + " resource " + resourceName;
1✔
117
  }
118

119
  public void start(XdsConfigWatcher xdsConfigWatcher) {
120
    checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
1✔
121
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
122
    // start the ball rolling
123
    syncContext.execute(() -> addWatcher(LDS_TYPE, new LdsWatcher(listenerName)));
1✔
124
  }
1✔
125

126
  @Override
127
  public XdsConfig.Subscription subscribeToCluster(String clusterName) {
128
    checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
1✔
129
    checkNotNull(clusterName, "clusterName");
1✔
130
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
131

132
    syncContext.execute(() -> {
1✔
133
      if (getWatchers(LDS_TYPE).isEmpty()) {
1✔
134
        subscription.closed = true;
1✔
135
        return; // shutdown() called
1✔
136
      }
137
      subscriptions.add(subscription);
1✔
138
      addClusterWatcher(clusterName);
1✔
139
    });
1✔
140

141
    return subscription;
1✔
142
  }
143

144
  /**
145
   * For all logical dns clusters refresh their results.
146
   */
147
  public void requestReresolution() {
148
    syncContext.execute(() -> {
1✔
149
      for (TrackedWatcher<List<EquivalentAddressGroup>> watcher : getWatchers(DNS_TYPE).values()) {
1✔
150
        DnsWatcher dnsWatcher = (DnsWatcher) watcher;
1✔
151
        dnsWatcher.refresh();
1✔
152
      }
1✔
153
    });
1✔
154
  }
1✔
155

156
  private <T extends ResourceUpdate> void addWatcher(
157
      TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
158
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
159
    XdsResourceType<T> type = watcher.type;
1✔
160
    String resourceName = watcher.resourceName;
1✔
161

162
    getWatchers(watcherType).put(resourceName, watcher);
1✔
163
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
164
  }
1✔
165

166
  public void shutdown() {
167
    syncContext.execute(() -> {
1✔
168
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
169
        for (TrackedWatcher<?> watcher : watchers.watchers.values()) {
1✔
170
          watcher.close();
1✔
171
        }
1✔
172
      }
1✔
173
      resourceWatchers.clear();
1✔
174
      subscriptions.clear();
1✔
175
    });
1✔
176
  }
1✔
177

178
  private void releaseSubscription(ClusterSubscription subscription) {
179
    checkNotNull(subscription, "subscription");
1✔
180
    syncContext.execute(() -> {
1✔
181
      if (subscription.closed) {
1✔
182
        return;
1✔
183
      }
184
      subscription.closed = true;
1✔
185
      if (!subscriptions.remove(subscription)) {
1✔
186
        return; // shutdown() called
×
187
      }
188
      maybePublishConfig();
1✔
189
    });
1✔
190
  }
1✔
191

192
  /**
193
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
194
   * the watchers.
195
   */
196
  private void maybePublishConfig() {
197
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
198
    if (getWatchers(LDS_TYPE).isEmpty()) {
1✔
199
      return; // shutdown() called
×
200
    }
201
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
202
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
203
        .anyMatch(TrackedWatcher::missingResult);
1✔
204
    if (waitingOnResource) {
1✔
205
      return;
1✔
206
    }
207

208
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
209
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
210
      return;
1✔
211
    }
212
    assert newUpdate.hasValue()
1✔
213
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
214
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
215
    lastUpdate = newUpdate;
1✔
216
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
217
  }
1✔
218

219
  @VisibleForTesting
220
  StatusOr<XdsConfig> buildUpdate() {
221
    // Create a config and discard any watchers not accessed
222
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
223
    StatusOr<XdsConfig> config = buildUpdate(
1✔
224
        tracer, listenerName, dataPlaneAuthority, subscriptions);
225
    tracer.closeUnusedWatchers();
1✔
226
    return config;
1✔
227
  }
228

229
  private static StatusOr<XdsConfig> buildUpdate(
230
      WatcherTracer tracer,
231
      String listenerName,
232
      String dataPlaneAuthority,
233
      Set<ClusterSubscription> subscriptions) {
234
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
235

236
    // Iterate watchers and build the XdsConfig
237

238
    TrackedWatcher<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
239
        = tracer.getWatcher(LDS_TYPE, listenerName);
1✔
240
    if (ldsWatcher == null) {
1✔
241
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
242
          "Bug: No listener watcher found for " + listenerName));
243
    }
244
    if (!ldsWatcher.getData().hasValue()) {
1✔
245
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
246
    }
247
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
248
    builder.setListener(ldsUpdate);
1✔
249

250
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
251
    if (routeSource == null) {
1✔
252
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
253
          "Bug: No route source found for listener " + dataPlaneAuthority));
254
    }
255
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
256
    if (!statusOrRdsUpdate.hasValue()) {
1✔
257
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
258
    }
259
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
260
    builder.setRoute(rdsUpdate);
1✔
261

262
    VirtualHost activeVirtualHost =
1✔
263
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
264
    if (activeVirtualHost == null) {
1✔
265
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
266
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
267
    }
268
    builder.setVirtualHost(activeVirtualHost);
1✔
269

270
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
271
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
272
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
273
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
274
    }
1✔
275
    for (ClusterSubscription subscription : subscriptions) {
1✔
276
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
277
    }
1✔
278
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
279
      builder.addCluster(me.getKey(), me.getValue());
1✔
280
    }
1✔
281

282
    return StatusOr.fromValue(builder.build());
1✔
283
  }
284

285
  private <T> Map<String, TrackedWatcher<T>> getWatchers(TrackedWatcherType<T> watcherType) {
286
    TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
1✔
287
    if (typeWatchers == null) {
1✔
288
      typeWatchers = new TypeWatchers<T>(watcherType);
1✔
289
      resourceWatchers.put(watcherType.typeEnum, typeWatchers);
1✔
290
    }
291
    assert typeWatchers.watcherType == watcherType;
1✔
292
    @SuppressWarnings("unchecked")
293
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
294
    return tTypeWatchers.watchers;
1✔
295
  }
296

297
  private static void addConfigForCluster(
298
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
299
      String clusterName,
300
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
301
      LinkedHashSet<String> ancestors,
302
      WatcherTracer tracer) {
303
    if (clusters.containsKey(clusterName)) {
1✔
304
      return;
1✔
305
    }
306
    if (ancestors.contains(clusterName)) {
1✔
307
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
308
          Status.INTERNAL.withDescription(
1✔
309
              "Aggregate cluster cycle detected: " + ancestors)));
310
      return;
1✔
311
    }
312
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
313
      clusters.put(clusterName, StatusOr.fromStatus(
×
314
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
315
      return;
×
316
    }
317

318
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CDS_TYPE, clusterName);
1✔
319
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
320
    if (!cdsWatcherDataOr.hasValue()) {
1✔
321
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
322
      return;
1✔
323
    }
324

325
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
326
    XdsConfig.XdsClusterConfig.ClusterChild child;
327
    switch (cdsUpdate.clusterType()) {
1✔
328
      case AGGREGATE:
329
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
330
        // preserves the priority across all aggregate clusters
331
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
332
        ancestors.add(clusterName);
1✔
333
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
334
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
335
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
336
          if (!config.hasValue()) {
1✔
337
            // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
338
            // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
339
            // watchers reports a transient ADS stream error, the policy should report that it is in
340
            // TRANSIENT_FAILURE if it has never passed a config to its child.
341
            //
342
            // But there's currently disagreement about whether that is actually what we want, and
343
            // that was not originally implemented in gRPC Java. So we're keeping Java's old
344
            // behavior for now and only failing the "leaves" (which is a bit arbitrary for a
345
            // cycle).
346
            leafNames.add(childCluster);
1✔
347
            continue;
1✔
348
          }
349
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
350
          if (children instanceof AggregateConfig) {
1✔
351
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
352
          } else {
353
            leafNames.add(childCluster);
1✔
354
          }
355
        }
1✔
356
        ancestors.remove(clusterName);
1✔
357

358
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
359
        break;
1✔
360
      case EDS:
361
        TrackedWatcher<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
362
            tracer.getWatcher(EDS_TYPE, cdsWatcher.getEdsServiceName());
1✔
363
        if (edsWatcher != null) {
1✔
364
          child = new EndpointConfig(edsWatcher.getData());
1✔
365
        } else {
366
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
367
              "EDS resource not found for cluster " + clusterName)));
368
        }
369
        break;
×
370
      case LOGICAL_DNS:
371
        if (enableLogicalDns) {
1✔
372
          TrackedWatcher<List<EquivalentAddressGroup>> dnsWatcher =
1✔
373
              tracer.getWatcher(DNS_TYPE, cdsUpdate.dnsHostName());
1✔
374
          child = new EndpointConfig(dnsToEdsUpdate(dnsWatcher.getData(), cdsUpdate.dnsHostName()));
1✔
375
        } else {
1✔
376
          child = new EndpointConfig(StatusOr.fromStatus(
×
377
              Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
×
378
        }
379
        break;
×
380
      default:
381
        child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
382
              "Unknown type in cluster " + clusterName + " " + cdsUpdate.clusterType())));
×
383
    }
384
    if (clusters.containsKey(clusterName)) {
1✔
385
      // If a cycle is detected, we'll have detected it while recursing, so now there will be a key
386
      // present. We don't want to overwrite it with a non-error value.
387
      return;
1✔
388
    }
389
    clusters.put(clusterName, StatusOr.fromValue(
1✔
390
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
391
  }
1✔
392

393
  private static StatusOr<XdsEndpointResource.EdsUpdate> dnsToEdsUpdate(
394
      StatusOr<List<EquivalentAddressGroup>> dnsData, String dnsHostName) {
395
    if (!dnsData.hasValue()) {
1✔
396
      return StatusOr.fromStatus(dnsData.getStatus());
1✔
397
    }
398

399
    List<SocketAddress> addresses = new ArrayList<>();
1✔
400
    for (EquivalentAddressGroup eag : dnsData.getValue()) {
1✔
401
      addresses.addAll(eag.getAddresses());
1✔
402
    }
1✔
403
    EquivalentAddressGroup eag = new EquivalentAddressGroup(addresses);
1✔
404
    List<Endpoints.LbEndpoint> endpoints = ImmutableList.of(
1✔
405
        Endpoints.LbEndpoint.create(eag, 1, true, dnsHostName, ImmutableMap.of()));
1✔
406
    LocalityLbEndpoints lbEndpoints =
1✔
407
        LocalityLbEndpoints.create(endpoints, 1, 0, ImmutableMap.of());
1✔
408
    return StatusOr.fromValue(new XdsEndpointResource.EdsUpdate(
1✔
409
        "fakeEds_logicalDns",
410
        Collections.singletonMap(LOGICAL_DNS_CLUSTER_LOCALITY, lbEndpoints),
1✔
411
        new ArrayList<>()));
412
  }
413

414
  private void addRdsWatcher(String resourceName) {
415
    if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
1✔
416
      return;
×
417
    }
418

419
    addWatcher(RDS_TYPE, new RdsWatcher(resourceName));
1✔
420
  }
1✔
421

422
  private void addEdsWatcher(String edsServiceName) {
423
    if (getWatchers(EDS_TYPE).containsKey(edsServiceName)) {
1✔
424
      return;
1✔
425
    }
426

427
    addWatcher(EDS_TYPE, new EdsWatcher(edsServiceName));
1✔
428
  }
1✔
429

430
  private void addClusterWatcher(String clusterName) {
431
    if (getWatchers(CDS_TYPE).containsKey(clusterName)) {
1✔
432
      return;
1✔
433
    }
434

435
    addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
1✔
436
  }
1✔
437

438
  private void addDnsWatcher(String dnsHostName) {
439
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
440
    if (getWatchers(DNS_TYPE).containsKey(dnsHostName)) {
1✔
441
      return;
×
442
    }
443

444
    DnsWatcher watcher = new DnsWatcher(dnsHostName, nameResolverArgs);
1✔
445
    getWatchers(DNS_TYPE).put(dnsHostName, watcher);
1✔
446
    watcher.start();
1✔
447
  }
1✔
448

449
  private void updateRoutes(List<VirtualHost> virtualHosts) {
450
    VirtualHost virtualHost =
1✔
451
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
452
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
453
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
454
  }
1✔
455

456
  private String nodeInfo() {
457
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
458
  }
459

460
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
461
    if (virtualHost == null) {
1✔
462
      return Collections.emptySet();
1✔
463
    }
464

465
    // Get all cluster names to which requests can be routed through the virtual host.
466
    Set<String> clusters = new HashSet<>();
1✔
467
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
468
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
469
      if (action == null) {
1✔
470
        continue;
1✔
471
      }
472
      if (action.cluster() != null) {
1✔
473
        clusters.add(action.cluster());
1✔
474
      } else if (action.weightedClusters() != null) {
1✔
475
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
476
          clusters.add(weighedCluster.name());
1✔
477
        }
1✔
478
      }
479
    }
1✔
480

481
    return clusters;
1✔
482
  }
483

484
  private static NameResolver createNameResolver(
485
      String dnsHostName,
486
      NameResolver.Args nameResolverArgs) {
487
    URI uri;
488
    try {
489
      uri = new URI("dns", "", "/" + dnsHostName, null);
1✔
490
    } catch (URISyntaxException e) {
×
491
      return new FailingNameResolver(
×
492
          Status.INTERNAL.withDescription("Bug, invalid URI creation: " + dnsHostName)
×
493
            .withCause(e));
×
494
    }
1✔
495

496
    NameResolverProvider provider =
1✔
497
        nameResolverArgs.getNameResolverRegistry().getProviderForScheme("dns");
1✔
498
    if (provider == null) {
1✔
499
      return new FailingNameResolver(
1✔
500
          Status.INTERNAL.withDescription("Could not find dns name resolver"));
1✔
501
    }
502

503
    NameResolver bareResolver = provider.newNameResolver(uri, nameResolverArgs);
1✔
504
    if (bareResolver == null) {
1✔
505
      return new FailingNameResolver(
×
506
          Status.INTERNAL.withDescription("DNS name resolver provider returned null: " + uri));
×
507
    }
508
    return RetryingNameResolver.wrap(bareResolver, nameResolverArgs);
1✔
509
  }
510

511
  private static class TypeWatchers<T> {
512
    // Key is resource name
513
    final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
1✔
514
    final TrackedWatcherType<T> watcherType;
515

516
    TypeWatchers(TrackedWatcherType<T> watcherType) {
1✔
517
      this.watcherType = checkNotNull(watcherType, "watcherType");
1✔
518
    }
1✔
519
  }
520

521
  public interface XdsConfigWatcher {
522
    /**
523
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
524
     * INTERNAL.
525
     */
526
    void onUpdate(StatusOr<XdsConfig> config);
527
  }
528

529
  private final class ClusterSubscription implements XdsConfig.Subscription {
530
    private final String clusterName;
531
    boolean closed; // Accessed from syncContext
532

533
    public ClusterSubscription(String clusterName) {
1✔
534
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
535
    }
1✔
536

537
    String getClusterName() {
538
      return clusterName;
1✔
539
    }
540

541
    @Override
542
    public void close() {
543
      releaseSubscription(this);
1✔
544
    }
1✔
545
  }
546

547
  /** State for tracing garbage collector. */
548
  private static final class WatcherTracer {
1✔
549
    private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers;
550
    private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> usedWatchers;
551

552
    public WatcherTracer(Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers) {
1✔
553
      this.resourceWatchers = resourceWatchers;
1✔
554

555
      this.usedWatchers = new EnumMap<>(TrackedWatcherTypeEnum.class);
1✔
556
      for (Map.Entry<TrackedWatcherTypeEnum, TypeWatchers<?>> me : resourceWatchers.entrySet()) {
1✔
557
        usedWatchers.put(me.getKey(), newTypeWatchers(me.getValue().watcherType));
1✔
558
      }
1✔
559
    }
1✔
560

561
    private static <T> TypeWatchers<T> newTypeWatchers(TrackedWatcherType<T> type) {
562
      return new TypeWatchers<T>(type);
1✔
563
    }
564

565
    public <T> TrackedWatcher<T> getWatcher(TrackedWatcherType<T> watcherType, String name) {
566
      TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
1✔
567
      if (typeWatchers == null) {
1✔
568
        return null;
×
569
      }
570
      assert typeWatchers.watcherType == watcherType;
1✔
571
      @SuppressWarnings("unchecked")
572
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
573
      TrackedWatcher<T> watcher = tTypeWatchers.watchers.get(name);
1✔
574
      if (watcher == null) {
1✔
575
        return null;
×
576
      }
577
      @SuppressWarnings("unchecked")
578
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(watcherType.typeEnum);
1✔
579
      usedTypeWatchers.watchers.put(name, watcher);
1✔
580
      return watcher;
1✔
581
    }
582

583
    /** Shut down unused watchers. */
584
    public void closeUnusedWatchers() {
585
      boolean changed = false; // Help out the GC by preferring old objects
1✔
586
      for (TrackedWatcherTypeEnum key : resourceWatchers.keySet()) {
1✔
587
        TypeWatchers<?> orig = resourceWatchers.get(key);
1✔
588
        TypeWatchers<?> used = usedWatchers.get(key);
1✔
589
        for (String name : orig.watchers.keySet()) {
1✔
590
          if (used.watchers.containsKey(name)) {
1✔
591
            continue;
1✔
592
          }
593
          orig.watchers.get(name).close();
1✔
594
          changed = true;
1✔
595
        }
1✔
596
      }
1✔
597
      if (changed) {
1✔
598
        resourceWatchers.putAll(usedWatchers);
1✔
599
      }
600
    }
1✔
601
  }
602

603
  @SuppressWarnings("UnusedTypeParameter")
604
  private static final class TrackedWatcherType<T> {
605
    public final TrackedWatcherTypeEnum typeEnum;
606

607
    public TrackedWatcherType(TrackedWatcherTypeEnum typeEnum) {
1✔
608
      this.typeEnum = checkNotNull(typeEnum, "typeEnum");
1✔
609
    }
1✔
610
  }
611

612
  private interface TrackedWatcher<T> {
613
    @Nullable
614
    StatusOr<T> getData();
615

616
    default boolean missingResult() {
617
      return getData() == null;
1✔
618
    }
619

620
    default boolean hasDataValue() {
621
      StatusOr<T> data = getData();
1✔
622
      return data != null && data.hasValue();
1✔
623
    }
624

625
    void close();
626
  }
627

628
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
629
      implements ResourceWatcher<T>, TrackedWatcher<T> {
630
    private final XdsResourceType<T> type;
631
    private final String resourceName;
632
    boolean cancelled;
633

634
    @Nullable
635
    private StatusOr<T> data;
636
    @Nullable
637
    @SuppressWarnings("unused")
638
    private Status ambientError;
639

640

641
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
642
      this.type = checkNotNull(type, "type");
1✔
643
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
644
    }
1✔
645

646
    @Override
647
    public void onResourceChanged(StatusOr<T> update) {
648
      if (cancelled) {
1✔
649
        return;
1✔
650
      }
651
      ambientError = null;
1✔
652
      if (update.hasValue()) {
1✔
653
        data = update;
1✔
654
        subscribeToChildren(update.getValue());
1✔
655
      } else {
656
        Status translatedStatus = GrpcUtil.statusWithDetails(
1✔
657
            Status.Code.UNAVAILABLE,
658
            "Error retrieving " + toContextString() + nodeInfo(),
1✔
659
            update.getStatus());
1✔
660

661
        data = StatusOr.fromStatus(translatedStatus);
1✔
662
      }
663
      maybePublishConfig();
1✔
664
    }
1✔
665

666
    @Override
667
    public void onAmbientError(Status error) {
668
      if (cancelled) {
1✔
669
        return;
×
670
      }
671
      ambientError = error.withDescription(
1✔
672
          String.format("Ambient error for %s: %s. Details: %s%s",
1✔
673
              toContextString(),
1✔
674
              error.getCode(),
1✔
675
              error.getDescription() != null ? error.getDescription() : "",
1✔
676
              nodeInfo()));
1✔
677
    }
1✔
678

679
    protected abstract void subscribeToChildren(T update);
680

681
    @Override
682
    public void close() {
683
      cancelled = true;
1✔
684
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
685
    }
1✔
686

687
    @Override
688
    @Nullable
689
    public StatusOr<T> getData() {
690
      return data;
1✔
691
    }
692

693
    public String toContextString() {
694
      return toContextStr(type.typeName(), resourceName);
1✔
695
    }
696
  }
697

698
  private interface RdsUpdateSupplier {
699
    StatusOr<RdsUpdate> getRdsUpdate();
700
  }
701

702
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
703
      implements RdsUpdateSupplier {
704

705
    private LdsWatcher(String resourceName) {
1✔
706
      super(XdsListenerResource.getInstance(), resourceName);
1✔
707
    }
1✔
708

709
    @Override
710
    public void subscribeToChildren(XdsListenerResource.LdsUpdate update) {
711
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
712
      List<VirtualHost> virtualHosts;
713
      if (httpConnectionManager == null) {
1✔
714
        // TCP listener. Unsupported config
715
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
716
      } else {
717
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
718
      }
719
      if (virtualHosts != null) {
1✔
720
        updateRoutes(virtualHosts);
1✔
721
      }
722

723
      String rdsName = getRdsName(update);
1✔
724
      if (rdsName != null) {
1✔
725
        addRdsWatcher(rdsName);
1✔
726
      }
727
    }
1✔
728

729
    private String getRdsName(XdsListenerResource.LdsUpdate update) {
730
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
731
      if (httpConnectionManager == null) {
1✔
732
        // TCP listener. Unsupported config
733
        return null;
1✔
734
      }
735
      return httpConnectionManager.rdsName();
1✔
736
    }
737

738
    private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTracer tracer) {
739
      String rdsName = getRdsName(update);
1✔
740
      if (rdsName == null) {
1✔
741
        return null;
×
742
      }
743
      return (RdsWatcher) tracer.getWatcher(RDS_TYPE, rdsName);
1✔
744
    }
745

746
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
747
      if (!hasDataValue()) {
1✔
748
        return this;
×
749
      }
750
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
751
      if (hcm == null) {
1✔
752
        return this;
1✔
753
      }
754
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
755
      if (virtualHosts != null) {
1✔
756
        return this;
1✔
757
      }
758
      RdsWatcher rdsWatcher = getRdsWatcher(getData().getValue(), tracer);
1✔
759
      assert rdsWatcher != null;
1✔
760
      return rdsWatcher;
1✔
761
    }
762

763
    @Override
764
    public StatusOr<RdsUpdate> getRdsUpdate() {
765
      if (missingResult()) {
1✔
766
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
767
      }
768
      if (!getData().hasValue()) {
1✔
769
        return StatusOr.fromStatus(getData().getStatus());
×
770
      }
771
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
772
      if (hcm == null) {
1✔
773
        return StatusOr.fromStatus(
1✔
774
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
775
      }
776
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
777
      if (virtualHosts == null) {
1✔
778
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
779
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
780
        // bug
781
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
782
      }
783
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
784
    }
785
  }
786

787
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
788

789
    public RdsWatcher(String resourceName) {
1✔
790
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
791
    }
1✔
792

793
    @Override
794
    public void subscribeToChildren(RdsUpdate update) {
795
      updateRoutes(update.virtualHosts);
1✔
796
    }
1✔
797

798
    @Override
799
    public StatusOr<RdsUpdate> getRdsUpdate() {
800
      if (missingResult()) {
1✔
801
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
802
      }
803
      return getData();
1✔
804
    }
805
  }
806

807
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
808
    CdsWatcher(String resourceName) {
1✔
809
      super(XdsClusterResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
810
    }
1✔
811

812
    @Override
813
    public void subscribeToChildren(XdsClusterResource.CdsUpdate update) {
814
      switch (update.clusterType()) {
1✔
815
        case EDS:
816
          addEdsWatcher(getEdsServiceName());
1✔
817
          break;
1✔
818
        case LOGICAL_DNS:
819
          if (enableLogicalDns) {
1✔
820
            addDnsWatcher(update.dnsHostName());
1✔
821
          }
822
          break;
823
        case AGGREGATE:
824
          update.prioritizedClusterNames()
1✔
825
              .forEach(name -> addClusterWatcher(name));
1✔
826
          break;
1✔
827
        default:
828
      }
829
    }
1✔
830

831
    public String getEdsServiceName() {
832
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
833
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
834
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
835
      if (edsServiceName == null) {
1✔
836
        edsServiceName = cdsUpdate.clusterName();
×
837
      }
838
      return edsServiceName;
1✔
839
    }
840
  }
841

842
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
843
    private EdsWatcher(String resourceName) {
1✔
844
      super(XdsEndpointResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
845
    }
1✔
846

847
    @Override
848
    public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
1✔
849
  }
850

851
  private final class DnsWatcher implements TrackedWatcher<List<EquivalentAddressGroup>> {
852
    private final NameResolver resolver;
853
    @Nullable
854
    private StatusOr<List<EquivalentAddressGroup>> data;
855
    private boolean cancelled;
856

857
    public DnsWatcher(String dnsHostName, NameResolver.Args nameResolverArgs) {
1✔
858
      this.resolver = createNameResolver(dnsHostName, nameResolverArgs);
1✔
859
    }
1✔
860

861
    public void start() {
862
      resolver.start(new NameResolverListener());
1✔
863
    }
1✔
864

865
    public void refresh() {
866
      if (cancelled) {
1✔
867
        return;
×
868
      }
869
      resolver.refresh();
1✔
870
    }
1✔
871

872
    @Override
873
    @Nullable
874
    public StatusOr<List<EquivalentAddressGroup>> getData() {
875
      return data;
1✔
876
    }
877

878
    @Override
879
    public void close() {
880
      if (cancelled) {
1✔
881
        return;
×
882
      }
883
      cancelled = true;
1✔
884
      resolver.shutdown();
1✔
885
    }
1✔
886

887
    private class NameResolverListener extends NameResolver.Listener2 {
1✔
888
      @Override
889
      public void onResult(final NameResolver.ResolutionResult resolutionResult) {
890
        syncContext.execute(() -> onResult2(resolutionResult));
×
891
      }
×
892

893
      @Override
894
      public Status onResult2(final NameResolver.ResolutionResult resolutionResult) {
895
        if (cancelled) {
1✔
896
          return Status.OK;
×
897
        }
898
        data = resolutionResult.getAddressesOrError();
1✔
899
        maybePublishConfig();
1✔
900
        return resolutionResult.getAddressesOrError().getStatus();
1✔
901
      }
902

903
      @Override
904
      public void onError(final Status error) {
905
        syncContext.execute(new Runnable() {
1✔
906
          @Override
907
          public void run() {
908
            if (cancelled) {
1✔
909
              return;
×
910
            }
911
            // DnsNameResolver cannot distinguish between address-not-found and transient errors.
912
            // Assume it is a transient error.
913
            // TODO: Once the resolution note API is available, don't throw away the error if
914
            // hasDataValue(); pass it as the note instead
915
            if (!hasDataValue()) {
1✔
916
              data = StatusOr.fromStatus(error);
1✔
917
              maybePublishConfig();
1✔
918
            }
919
          }
1✔
920
        });
921
      }
1✔
922
    }
923
  }
924

925
  private static final class FailingNameResolver extends NameResolver {
926
    private final Status status;
927

928
    public FailingNameResolver(Status status) {
1✔
929
      checkNotNull(status, "status");
1✔
930
      checkArgument(!status.isOk(), "Status must not be OK");
1✔
931
      this.status = status;
1✔
932
    }
1✔
933

934
    @Override
935
    public void start(Listener2 listener) {
936
      listener.onError(status);
1✔
937
    }
1✔
938

939
    @Override
940
    public String getServiceAuthority() {
941
      return "bug-if-you-see-this-authority";
×
942
    }
943

944
    @Override
945
    public void shutdown() {}
1✔
946
  }
947
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc