• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19986

18 Sep 2025 06:08PM UTC coverage: 88.539% (-0.008%) from 88.547%
#19986

push

github

web-flow
xds: Convert ClusterResolverLb to XdsDepManager

No longer need to hard-code pick_first because of gRFC A61.
https://github.com/grpc/proposal/pull/477

34664 of 39151 relevant lines covered (88.54%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.ImmutableMap;
27
import io.grpc.EquivalentAddressGroup;
28
import io.grpc.NameResolver;
29
import io.grpc.NameResolverProvider;
30
import io.grpc.Status;
31
import io.grpc.StatusOr;
32
import io.grpc.SynchronizationContext;
33
import io.grpc.internal.RetryingNameResolver;
34
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
35
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
36
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
37
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
38
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
39
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
40
import io.grpc.xds.client.Locality;
41
import io.grpc.xds.client.XdsClient;
42
import io.grpc.xds.client.XdsClient.ResourceWatcher;
43
import io.grpc.xds.client.XdsResourceType;
44
import java.net.SocketAddress;
45
import java.net.URI;
46
import java.net.URISyntaxException;
47
import java.util.ArrayList;
48
import java.util.Collections;
49
import java.util.EnumMap;
50
import java.util.HashMap;
51
import java.util.HashSet;
52
import java.util.LinkedHashSet;
53
import java.util.List;
54
import java.util.Map;
55
import java.util.Objects;
56
import java.util.Set;
57
import javax.annotation.Nullable;
58

59
/**
60
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
61
 * maintains the watchers for the xds resources and when an update is received, it either requests
62
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
63
 * applies to a single data plane authority.
64
 */
65
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
66
  private enum TrackedWatcherTypeEnum {
1✔
67
    LDS, RDS, CDS, EDS, DNS
1✔
68
  }
69

70
  private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
1✔
71
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.LDS);
72
  private static final TrackedWatcherType<RdsUpdate> RDS_TYPE =
1✔
73
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.RDS);
74
  private static final TrackedWatcherType<XdsClusterResource.CdsUpdate> CDS_TYPE =
1✔
75
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
76
  private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
1✔
77
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
78
  private static final TrackedWatcherType<List<EquivalentAddressGroup>> DNS_TYPE =
1✔
79
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.DNS);
80

81
  // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
82
  // to an empty locality.
83
  private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
1✔
84

85
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
86

87
  static boolean enableLogicalDns = true;
1✔
88

89
  private final String listenerName;
90
  private final XdsClient xdsClient;
91
  private final SynchronizationContext syncContext;
92
  private final String dataPlaneAuthority;
93
  private final NameResolver.Args nameResolverArgs;
94
  private XdsConfigWatcher xdsConfigWatcher;
95

96
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
97
  private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers =
1✔
98
      new EnumMap<>(TrackedWatcherTypeEnum.class);
99
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
100

101
  XdsDependencyManager(
102
      XdsClient xdsClient,
103
      SynchronizationContext syncContext,
104
      String dataPlaneAuthority,
105
      String listenerName,
106
      NameResolver.Args nameResolverArgs) {
1✔
107
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
108
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
109
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
110
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
111
    this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
112
  }
1✔
113

114
  public static String toContextStr(String typeName, String resourceName) {
115
    return typeName + " resource " + resourceName;
1✔
116
  }
117

118
  public void start(XdsConfigWatcher xdsConfigWatcher) {
119
    checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
1✔
120
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
121
    // start the ball rolling
122
    syncContext.execute(() -> addWatcher(LDS_TYPE, new LdsWatcher(listenerName)));
1✔
123
  }
1✔
124

125
  @Override
126
  public XdsConfig.Subscription subscribeToCluster(String clusterName) {
127
    checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
1✔
128
    checkNotNull(clusterName, "clusterName");
1✔
129
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
130

131
    syncContext.execute(() -> {
1✔
132
      if (getWatchers(LDS_TYPE).isEmpty()) {
1✔
133
        subscription.closed = true;
1✔
134
        return; // shutdown() called
1✔
135
      }
136
      subscriptions.add(subscription);
1✔
137
      addClusterWatcher(clusterName);
1✔
138
    });
1✔
139

140
    return subscription;
1✔
141
  }
142

143
  /**
144
   * For all logical dns clusters refresh their results.
145
   */
146
  public void requestReresolution() {
147
    syncContext.execute(() -> {
1✔
148
      for (TrackedWatcher<List<EquivalentAddressGroup>> watcher : getWatchers(DNS_TYPE).values()) {
1✔
149
        DnsWatcher dnsWatcher = (DnsWatcher) watcher;
1✔
150
        dnsWatcher.refresh();
1✔
151
      }
1✔
152
    });
1✔
153
  }
1✔
154

155
  private <T extends ResourceUpdate> void addWatcher(
156
      TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
157
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
158
    XdsResourceType<T> type = watcher.type;
1✔
159
    String resourceName = watcher.resourceName;
1✔
160

161
    getWatchers(watcherType).put(resourceName, watcher);
1✔
162
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
163
  }
1✔
164

165
  public void shutdown() {
166
    syncContext.execute(() -> {
1✔
167
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
168
        for (TrackedWatcher<?> watcher : watchers.watchers.values()) {
1✔
169
          watcher.close();
1✔
170
        }
1✔
171
      }
1✔
172
      resourceWatchers.clear();
1✔
173
      subscriptions.clear();
1✔
174
    });
1✔
175
  }
1✔
176

177
  private void releaseSubscription(ClusterSubscription subscription) {
178
    checkNotNull(subscription, "subscription");
1✔
179
    syncContext.execute(() -> {
1✔
180
      if (subscription.closed) {
1✔
181
        return;
1✔
182
      }
183
      subscription.closed = true;
1✔
184
      if (!subscriptions.remove(subscription)) {
1✔
185
        return; // shutdown() called
×
186
      }
187
      maybePublishConfig();
1✔
188
    });
1✔
189
  }
1✔
190

191
  /**
192
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
193
   * the watchers.
194
   */
195
  private void maybePublishConfig() {
196
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
197
    if (getWatchers(LDS_TYPE).isEmpty()) {
1✔
198
      return; // shutdown() called
×
199
    }
200
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
201
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
202
        .anyMatch(TrackedWatcher::missingResult);
1✔
203
    if (waitingOnResource) {
1✔
204
      return;
1✔
205
    }
206

207
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
208
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
209
      return;
1✔
210
    }
211
    assert newUpdate.hasValue()
1✔
212
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
213
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
214
    lastUpdate = newUpdate;
1✔
215
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
216
  }
1✔
217

218
  @VisibleForTesting
219
  StatusOr<XdsConfig> buildUpdate() {
220
    // Create a config and discard any watchers not accessed
221
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
222
    StatusOr<XdsConfig> config = buildUpdate(
1✔
223
        tracer, listenerName, dataPlaneAuthority, subscriptions);
224
    tracer.closeUnusedWatchers();
1✔
225
    return config;
1✔
226
  }
227

228
  private static StatusOr<XdsConfig> buildUpdate(
229
      WatcherTracer tracer,
230
      String listenerName,
231
      String dataPlaneAuthority,
232
      Set<ClusterSubscription> subscriptions) {
233
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
234

235
    // Iterate watchers and build the XdsConfig
236

237
    TrackedWatcher<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
238
        = tracer.getWatcher(LDS_TYPE, listenerName);
1✔
239
    if (ldsWatcher == null) {
1✔
240
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
241
          "Bug: No listener watcher found for " + listenerName));
242
    }
243
    if (!ldsWatcher.getData().hasValue()) {
1✔
244
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
245
    }
246
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
247
    builder.setListener(ldsUpdate);
1✔
248

249
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
250
    if (routeSource == null) {
1✔
251
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
252
          "Bug: No route source found for listener " + dataPlaneAuthority));
253
    }
254
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
255
    if (!statusOrRdsUpdate.hasValue()) {
1✔
256
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
257
    }
258
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
259
    builder.setRoute(rdsUpdate);
1✔
260

261
    VirtualHost activeVirtualHost =
1✔
262
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
263
    if (activeVirtualHost == null) {
1✔
264
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
265
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
266
    }
267
    builder.setVirtualHost(activeVirtualHost);
1✔
268

269
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
270
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
271
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
272
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
273
    }
1✔
274
    for (ClusterSubscription subscription : subscriptions) {
1✔
275
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
276
    }
1✔
277
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
278
      builder.addCluster(me.getKey(), me.getValue());
1✔
279
    }
1✔
280

281
    return StatusOr.fromValue(builder.build());
1✔
282
  }
283

284
  private <T> Map<String, TrackedWatcher<T>> getWatchers(TrackedWatcherType<T> watcherType) {
285
    TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
1✔
286
    if (typeWatchers == null) {
1✔
287
      typeWatchers = new TypeWatchers<T>(watcherType);
1✔
288
      resourceWatchers.put(watcherType.typeEnum, typeWatchers);
1✔
289
    }
290
    assert typeWatchers.watcherType == watcherType;
1✔
291
    @SuppressWarnings("unchecked")
292
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
293
    return tTypeWatchers.watchers;
1✔
294
  }
295

296
  private static void addConfigForCluster(
297
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
298
      String clusterName,
299
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
300
      LinkedHashSet<String> ancestors,
301
      WatcherTracer tracer) {
302
    if (clusters.containsKey(clusterName)) {
1✔
303
      return;
1✔
304
    }
305
    if (ancestors.contains(clusterName)) {
1✔
306
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
307
          Status.INTERNAL.withDescription(
1✔
308
              "Aggregate cluster cycle detected: " + ancestors)));
309
      return;
1✔
310
    }
311
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
312
      clusters.put(clusterName, StatusOr.fromStatus(
×
313
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
314
      return;
×
315
    }
316

317
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CDS_TYPE, clusterName);
1✔
318
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
319
    if (!cdsWatcherDataOr.hasValue()) {
1✔
320
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
321
      return;
1✔
322
    }
323

324
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
325
    XdsConfig.XdsClusterConfig.ClusterChild child;
326
    switch (cdsUpdate.clusterType()) {
1✔
327
      case AGGREGATE:
328
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
329
        // preserves the priority across all aggregate clusters
330
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
331
        ancestors.add(clusterName);
1✔
332
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
333
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
334
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
335
          if (!config.hasValue()) {
1✔
336
            // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
337
            // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
338
            // watchers reports a transient ADS stream error, the policy should report that it is in
339
            // TRANSIENT_FAILURE if it has never passed a config to its child.
340
            //
341
            // But there's currently disagreement about whether that is actually what we want, and
342
            // that was not originally implemented in gRPC Java. So we're keeping Java's old
343
            // behavior for now and only failing the "leaves" (which is a bit arbitrary for a
344
            // cycle).
345
            leafNames.add(childCluster);
1✔
346
            continue;
1✔
347
          }
348
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
349
          if (children instanceof AggregateConfig) {
1✔
350
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
351
          } else {
352
            leafNames.add(childCluster);
1✔
353
          }
354
        }
1✔
355
        ancestors.remove(clusterName);
1✔
356

357
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
358
        break;
1✔
359
      case EDS:
360
        TrackedWatcher<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
361
            tracer.getWatcher(EDS_TYPE, cdsWatcher.getEdsServiceName());
1✔
362
        if (edsWatcher != null) {
1✔
363
          child = new EndpointConfig(edsWatcher.getData());
1✔
364
        } else {
365
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
366
              "EDS resource not found for cluster " + clusterName)));
367
        }
368
        break;
×
369
      case LOGICAL_DNS:
370
        if (enableLogicalDns) {
1✔
371
          TrackedWatcher<List<EquivalentAddressGroup>> dnsWatcher =
1✔
372
              tracer.getWatcher(DNS_TYPE, cdsUpdate.dnsHostName());
1✔
373
          child = new EndpointConfig(dnsToEdsUpdate(dnsWatcher.getData(), cdsUpdate.dnsHostName()));
1✔
374
        } else {
1✔
375
          child = new EndpointConfig(StatusOr.fromStatus(
×
376
              Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
×
377
        }
378
        break;
×
379
      default:
380
        child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
381
              "Unknown type in cluster " + clusterName + " " + cdsUpdate.clusterType())));
×
382
    }
383
    if (clusters.containsKey(clusterName)) {
1✔
384
      // If a cycle is detected, we'll have detected it while recursing, so now there will be a key
385
      // present. We don't want to overwrite it with a non-error value.
386
      return;
1✔
387
    }
388
    clusters.put(clusterName, StatusOr.fromValue(
1✔
389
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
390
  }
1✔
391

392
  private static StatusOr<XdsEndpointResource.EdsUpdate> dnsToEdsUpdate(
393
      StatusOr<List<EquivalentAddressGroup>> dnsData, String dnsHostName) {
394
    if (!dnsData.hasValue()) {
1✔
395
      return StatusOr.fromStatus(dnsData.getStatus());
1✔
396
    }
397

398
    List<SocketAddress> addresses = new ArrayList<>();
1✔
399
    for (EquivalentAddressGroup eag : dnsData.getValue()) {
1✔
400
      addresses.addAll(eag.getAddresses());
1✔
401
    }
1✔
402
    EquivalentAddressGroup eag = new EquivalentAddressGroup(addresses);
1✔
403
    List<Endpoints.LbEndpoint> endpoints = ImmutableList.of(
1✔
404
        Endpoints.LbEndpoint.create(eag, 1, true, dnsHostName, ImmutableMap.of()));
1✔
405
    LocalityLbEndpoints lbEndpoints =
1✔
406
        LocalityLbEndpoints.create(endpoints, 1, 0, ImmutableMap.of());
1✔
407
    return StatusOr.fromValue(new XdsEndpointResource.EdsUpdate(
1✔
408
        "fakeEds_logicalDns",
409
        Collections.singletonMap(LOGICAL_DNS_CLUSTER_LOCALITY, lbEndpoints),
1✔
410
        new ArrayList<>()));
411
  }
412

413
  private void addRdsWatcher(String resourceName) {
414
    if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
1✔
415
      return;
×
416
    }
417

418
    addWatcher(RDS_TYPE, new RdsWatcher(resourceName));
1✔
419
  }
1✔
420

421
  private void addEdsWatcher(String edsServiceName) {
422
    if (getWatchers(EDS_TYPE).containsKey(edsServiceName)) {
1✔
423
      return;
1✔
424
    }
425

426
    addWatcher(EDS_TYPE, new EdsWatcher(edsServiceName));
1✔
427
  }
1✔
428

429
  private void addClusterWatcher(String clusterName) {
430
    if (getWatchers(CDS_TYPE).containsKey(clusterName)) {
1✔
431
      return;
1✔
432
    }
433

434
    addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
1✔
435
  }
1✔
436

437
  private void addDnsWatcher(String dnsHostName) {
438
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
439
    if (getWatchers(DNS_TYPE).containsKey(dnsHostName)) {
1✔
440
      return;
×
441
    }
442

443
    DnsWatcher watcher = new DnsWatcher(dnsHostName, nameResolverArgs);
1✔
444
    getWatchers(DNS_TYPE).put(dnsHostName, watcher);
1✔
445
    watcher.start();
1✔
446
  }
1✔
447

448
  private void updateRoutes(List<VirtualHost> virtualHosts) {
449
    VirtualHost virtualHost =
1✔
450
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
451
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
452
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
453
  }
1✔
454

455
  private String nodeInfo() {
456
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
457
  }
458

459
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
460
    if (virtualHost == null) {
1✔
461
      return Collections.emptySet();
1✔
462
    }
463

464
    // Get all cluster names to which requests can be routed through the virtual host.
465
    Set<String> clusters = new HashSet<>();
1✔
466
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
467
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
468
      if (action == null) {
1✔
469
        continue;
1✔
470
      }
471
      if (action.cluster() != null) {
1✔
472
        clusters.add(action.cluster());
1✔
473
      } else if (action.weightedClusters() != null) {
1✔
474
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
475
          clusters.add(weighedCluster.name());
1✔
476
        }
1✔
477
      }
478
    }
1✔
479

480
    return clusters;
1✔
481
  }
482

483
  private static NameResolver createNameResolver(
484
      String dnsHostName,
485
      NameResolver.Args nameResolverArgs) {
486
    URI uri;
487
    try {
488
      uri = new URI("dns", "", "/" + dnsHostName, null);
1✔
489
    } catch (URISyntaxException e) {
×
490
      return new FailingNameResolver(
×
491
          Status.INTERNAL.withDescription("Bug, invalid URI creation: " + dnsHostName)
×
492
            .withCause(e));
×
493
    }
1✔
494

495
    NameResolverProvider provider =
1✔
496
        nameResolverArgs.getNameResolverRegistry().getProviderForScheme("dns");
1✔
497
    if (provider == null) {
1✔
498
      return new FailingNameResolver(
1✔
499
          Status.INTERNAL.withDescription("Could not find dns name resolver"));
1✔
500
    }
501

502
    NameResolver bareResolver = provider.newNameResolver(uri, nameResolverArgs);
1✔
503
    if (bareResolver == null) {
1✔
504
      return new FailingNameResolver(
×
505
          Status.INTERNAL.withDescription("DNS name resolver provider returned null: " + uri));
×
506
    }
507
    return RetryingNameResolver.wrap(bareResolver, nameResolverArgs);
1✔
508
  }
509

510
  private static class TypeWatchers<T> {
511
    // Key is resource name
512
    final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
1✔
513
    final TrackedWatcherType<T> watcherType;
514

515
    TypeWatchers(TrackedWatcherType<T> watcherType) {
1✔
516
      this.watcherType = checkNotNull(watcherType, "watcherType");
1✔
517
    }
1✔
518
  }
519

520
  public interface XdsConfigWatcher {
521
    /**
522
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
523
     * INTERNAL.
524
     */
525
    void onUpdate(StatusOr<XdsConfig> config);
526
  }
527

528
  private final class ClusterSubscription implements XdsConfig.Subscription {
529
    private final String clusterName;
530
    boolean closed; // Accessed from syncContext
531

532
    public ClusterSubscription(String clusterName) {
1✔
533
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
534
    }
1✔
535

536
    String getClusterName() {
537
      return clusterName;
1✔
538
    }
539

540
    @Override
541
    public void close() {
542
      releaseSubscription(this);
1✔
543
    }
1✔
544
  }
545

546
  /** State for tracing garbage collector. */
547
  private static final class WatcherTracer {
1✔
548
    private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers;
549
    private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> usedWatchers;
550

551
    public WatcherTracer(Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers) {
1✔
552
      this.resourceWatchers = resourceWatchers;
1✔
553

554
      this.usedWatchers = new EnumMap<>(TrackedWatcherTypeEnum.class);
1✔
555
      for (Map.Entry<TrackedWatcherTypeEnum, TypeWatchers<?>> me : resourceWatchers.entrySet()) {
1✔
556
        usedWatchers.put(me.getKey(), newTypeWatchers(me.getValue().watcherType));
1✔
557
      }
1✔
558
    }
1✔
559

560
    private static <T> TypeWatchers<T> newTypeWatchers(TrackedWatcherType<T> type) {
561
      return new TypeWatchers<T>(type);
1✔
562
    }
563

564
    public <T> TrackedWatcher<T> getWatcher(TrackedWatcherType<T> watcherType, String name) {
565
      TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
1✔
566
      if (typeWatchers == null) {
1✔
567
        return null;
×
568
      }
569
      assert typeWatchers.watcherType == watcherType;
1✔
570
      @SuppressWarnings("unchecked")
571
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
572
      TrackedWatcher<T> watcher = tTypeWatchers.watchers.get(name);
1✔
573
      if (watcher == null) {
1✔
574
        return null;
×
575
      }
576
      @SuppressWarnings("unchecked")
577
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(watcherType.typeEnum);
1✔
578
      usedTypeWatchers.watchers.put(name, watcher);
1✔
579
      return watcher;
1✔
580
    }
581

582
    /** Shut down unused watchers. */
583
    public void closeUnusedWatchers() {
584
      boolean changed = false; // Help out the GC by preferring old objects
1✔
585
      for (TrackedWatcherTypeEnum key : resourceWatchers.keySet()) {
1✔
586
        TypeWatchers<?> orig = resourceWatchers.get(key);
1✔
587
        TypeWatchers<?> used = usedWatchers.get(key);
1✔
588
        for (String name : orig.watchers.keySet()) {
1✔
589
          if (used.watchers.containsKey(name)) {
1✔
590
            continue;
1✔
591
          }
592
          orig.watchers.get(name).close();
1✔
593
          changed = true;
1✔
594
        }
1✔
595
      }
1✔
596
      if (changed) {
1✔
597
        resourceWatchers.putAll(usedWatchers);
1✔
598
      }
599
    }
1✔
600
  }
601

602
  @SuppressWarnings("UnusedTypeParameter")
603
  private static final class TrackedWatcherType<T> {
604
    public final TrackedWatcherTypeEnum typeEnum;
605

606
    public TrackedWatcherType(TrackedWatcherTypeEnum typeEnum) {
1✔
607
      this.typeEnum = checkNotNull(typeEnum, "typeEnum");
1✔
608
    }
1✔
609
  }
610

611
  private interface TrackedWatcher<T> {
612
    @Nullable
613
    StatusOr<T> getData();
614

615
    default boolean missingResult() {
616
      return getData() == null;
1✔
617
    }
618

619
    default boolean hasDataValue() {
620
      StatusOr<T> data = getData();
1✔
621
      return data != null && data.hasValue();
1✔
622
    }
623

624
    void close();
625
  }
626

627
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
628
      implements ResourceWatcher<T>, TrackedWatcher<T> {
629
    private final XdsResourceType<T> type;
630
    private final String resourceName;
631
    boolean cancelled;
632

633
    @Nullable
634
    private StatusOr<T> data;
635

636

637
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
638
      this.type = checkNotNull(type, "type");
1✔
639
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
640
    }
1✔
641

642
    @Override
643
    public void onError(Status error) {
644
      checkNotNull(error, "error");
1✔
645
      if (cancelled) {
1✔
646
        return;
×
647
      }
648
      // Don't update configuration on error, if we've already received configuration
649
      if (!hasDataValue()) {
1✔
650
        this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
651
            String.format("Error retrieving %s: %s: %s",
1✔
652
              toContextString(), error.getCode(), error.getDescription())));
1✔
653
        maybePublishConfig();
1✔
654
      }
655
    }
1✔
656

657
    @Override
658
    public void onResourceDoesNotExist(String resourceName) {
659
      if (cancelled) {
1✔
660
        return;
×
661
      }
662

663
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
664
      this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
665
          toContextString() + " does not exist" + nodeInfo()));
1✔
666
      maybePublishConfig();
1✔
667
    }
1✔
668

669
    @Override
670
    public void onChanged(T update) {
671
      checkNotNull(update, "update");
1✔
672
      if (cancelled) {
1✔
673
        return;
1✔
674
      }
675

676
      this.data = StatusOr.fromValue(update);
1✔
677
      subscribeToChildren(update);
1✔
678
      maybePublishConfig();
1✔
679
    }
1✔
680

681
    protected abstract void subscribeToChildren(T update);
682

683
    @Override
684
    public void close() {
685
      cancelled = true;
1✔
686
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
687
    }
1✔
688

689
    @Override
690
    @Nullable
691
    public StatusOr<T> getData() {
692
      return data;
1✔
693
    }
694

695
    public String toContextString() {
696
      return toContextStr(type.typeName(), resourceName);
1✔
697
    }
698
  }
699

700
  private interface RdsUpdateSupplier {
701
    StatusOr<RdsUpdate> getRdsUpdate();
702
  }
703

704
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
705
      implements RdsUpdateSupplier {
706

707
    private LdsWatcher(String resourceName) {
1✔
708
      super(XdsListenerResource.getInstance(), resourceName);
1✔
709
    }
1✔
710

711
    @Override
712
    public void subscribeToChildren(XdsListenerResource.LdsUpdate update) {
713
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
714
      List<VirtualHost> virtualHosts;
715
      if (httpConnectionManager == null) {
1✔
716
        // TCP listener. Unsupported config
717
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
718
      } else {
719
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
720
      }
721
      if (virtualHosts != null) {
1✔
722
        updateRoutes(virtualHosts);
1✔
723
      }
724

725
      String rdsName = getRdsName(update);
1✔
726
      if (rdsName != null) {
1✔
727
        addRdsWatcher(rdsName);
1✔
728
      }
729
    }
1✔
730

731
    private String getRdsName(XdsListenerResource.LdsUpdate update) {
732
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
733
      if (httpConnectionManager == null) {
1✔
734
        // TCP listener. Unsupported config
735
        return null;
1✔
736
      }
737
      return httpConnectionManager.rdsName();
1✔
738
    }
739

740
    private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTracer tracer) {
741
      String rdsName = getRdsName(update);
1✔
742
      if (rdsName == null) {
1✔
743
        return null;
×
744
      }
745
      return (RdsWatcher) tracer.getWatcher(RDS_TYPE, rdsName);
1✔
746
    }
747

748
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
749
      if (!hasDataValue()) {
1✔
750
        return this;
×
751
      }
752
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
753
      if (hcm == null) {
1✔
754
        return this;
1✔
755
      }
756
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
757
      if (virtualHosts != null) {
1✔
758
        return this;
1✔
759
      }
760
      RdsWatcher rdsWatcher = getRdsWatcher(getData().getValue(), tracer);
1✔
761
      assert rdsWatcher != null;
1✔
762
      return rdsWatcher;
1✔
763
    }
764

765
    @Override
766
    public StatusOr<RdsUpdate> getRdsUpdate() {
767
      if (missingResult()) {
1✔
768
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
769
      }
770
      if (!getData().hasValue()) {
1✔
771
        return StatusOr.fromStatus(getData().getStatus());
×
772
      }
773
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
774
      if (hcm == null) {
1✔
775
        return StatusOr.fromStatus(
1✔
776
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
777
      }
778
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
779
      if (virtualHosts == null) {
1✔
780
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
781
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
782
        // bug
783
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
784
      }
785
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
786
    }
787
  }
788

789
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
790

791
    public RdsWatcher(String resourceName) {
1✔
792
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
793
    }
1✔
794

795
    @Override
796
    public void subscribeToChildren(RdsUpdate update) {
797
      updateRoutes(update.virtualHosts);
1✔
798
    }
1✔
799

800
    @Override
801
    public StatusOr<RdsUpdate> getRdsUpdate() {
802
      if (missingResult()) {
1✔
803
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
804
      }
805
      return getData();
1✔
806
    }
807
  }
808

809
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
810
    CdsWatcher(String resourceName) {
1✔
811
      super(XdsClusterResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
812
    }
1✔
813

814
    @Override
815
    public void subscribeToChildren(XdsClusterResource.CdsUpdate update) {
816
      switch (update.clusterType()) {
1✔
817
        case EDS:
818
          addEdsWatcher(getEdsServiceName());
1✔
819
          break;
1✔
820
        case LOGICAL_DNS:
821
          if (enableLogicalDns) {
1✔
822
            addDnsWatcher(update.dnsHostName());
1✔
823
          }
824
          break;
825
        case AGGREGATE:
826
          update.prioritizedClusterNames()
1✔
827
              .forEach(name -> addClusterWatcher(name));
1✔
828
          break;
1✔
829
        default:
830
      }
831
    }
1✔
832

833
    public String getEdsServiceName() {
834
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
835
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
836
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
837
      if (edsServiceName == null) {
1✔
838
        edsServiceName = cdsUpdate.clusterName();
×
839
      }
840
      return edsServiceName;
1✔
841
    }
842
  }
843

844
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
845
    private EdsWatcher(String resourceName) {
1✔
846
      super(XdsEndpointResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
847
    }
1✔
848

849
    @Override
850
    public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
1✔
851
  }
852

853
  private final class DnsWatcher implements TrackedWatcher<List<EquivalentAddressGroup>> {
854
    private final NameResolver resolver;
855
    @Nullable
856
    private StatusOr<List<EquivalentAddressGroup>> data;
857
    private boolean cancelled;
858

859
    public DnsWatcher(String dnsHostName, NameResolver.Args nameResolverArgs) {
1✔
860
      this.resolver = createNameResolver(dnsHostName, nameResolverArgs);
1✔
861
    }
1✔
862

863
    public void start() {
864
      resolver.start(new NameResolverListener());
1✔
865
    }
1✔
866

867
    public void refresh() {
868
      if (cancelled) {
1✔
869
        return;
×
870
      }
871
      resolver.refresh();
1✔
872
    }
1✔
873

874
    @Override
875
    @Nullable
876
    public StatusOr<List<EquivalentAddressGroup>> getData() {
877
      return data;
1✔
878
    }
879

880
    @Override
881
    public void close() {
882
      if (cancelled) {
1✔
883
        return;
×
884
      }
885
      cancelled = true;
1✔
886
      resolver.shutdown();
1✔
887
    }
1✔
888

889
    private class NameResolverListener extends NameResolver.Listener2 {
1✔
890
      @Override
891
      public void onResult(final NameResolver.ResolutionResult resolutionResult) {
892
        syncContext.execute(() -> onResult2(resolutionResult));
×
893
      }
×
894

895
      @Override
896
      public Status onResult2(final NameResolver.ResolutionResult resolutionResult) {
897
        if (cancelled) {
1✔
898
          return Status.OK;
×
899
        }
900
        data = resolutionResult.getAddressesOrError();
1✔
901
        maybePublishConfig();
1✔
902
        return resolutionResult.getAddressesOrError().getStatus();
1✔
903
      }
904

905
      @Override
906
      public void onError(final Status error) {
907
        syncContext.execute(new Runnable() {
1✔
908
          @Override
909
          public void run() {
910
            if (cancelled) {
1✔
911
              return;
×
912
            }
913
            // DnsNameResolver cannot distinguish between address-not-found and transient errors.
914
            // Assume it is a transient error.
915
            // TODO: Once the resolution note API is available, don't throw away the error if
916
            // hasDataValue(); pass it as the note instead
917
            if (!hasDataValue()) {
1✔
918
              data = StatusOr.fromStatus(error);
1✔
919
              maybePublishConfig();
1✔
920
            }
921
          }
1✔
922
        });
923
      }
1✔
924
    }
925
  }
926

927
  private static final class FailingNameResolver extends NameResolver {
928
    private final Status status;
929

930
    public FailingNameResolver(Status status) {
1✔
931
      checkNotNull(status, "status");
1✔
932
      checkArgument(!status.isOk(), "Status must not be OK");
1✔
933
      this.status = status;
1✔
934
    }
1✔
935

936
    @Override
937
    public void start(Listener2 listener) {
938
      listener.onError(status);
1✔
939
    }
1✔
940

941
    @Override
942
    public String getServiceAuthority() {
943
      return "bug-if-you-see-this-authority";
×
944
    }
945

946
    @Override
947
    public void shutdown() {}
1✔
948
  }
949
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc