• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19850

05 Jun 2025 03:43PM UTC coverage: 88.616% (-0.03%) from 88.646%
#19850

push

github

ejona86
xds: Fix XdsDepManager aggregate cluster child ordering and loop detection

The children of aggregate clusters have a priority order, so we can't
ever throw them in an ordinary set for later iteration.

This now detects recusion limits only after subscribing, but that
matches our existing behavior in CdsLoadBalancer2. We don't get much
value detecting the limit before subscribing and doing so makes watcher
types more different.

Loops are still a bit broken as they won't be unwatched when orphaned,
as they will form a reference loop. In CdsLoadBalancer2, duplicate
clusters had duplicate watchers so there was single-ownership and
reference cycles couldn't form. Fixing that is a bigger change.

Intermediate aggregate clusters are now included in XdsConfig, just for
simplicity. It doesn't hurt anything whether they are present or
missing. but it required updates to some tests.

34764 of 39230 relevant lines covered (88.62%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.65
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.Sets;
27
import io.grpc.InternalLogId;
28
import io.grpc.NameResolver;
29
import io.grpc.Status;
30
import io.grpc.StatusOr;
31
import io.grpc.SynchronizationContext;
32
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
33
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
34
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
35
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
36
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
37
import io.grpc.xds.client.XdsClient;
38
import io.grpc.xds.client.XdsClient.ResourceWatcher;
39
import io.grpc.xds.client.XdsLogger;
40
import io.grpc.xds.client.XdsResourceType;
41
import java.io.Closeable;
42
import java.io.IOException;
43
import java.util.Collections;
44
import java.util.HashMap;
45
import java.util.HashSet;
46
import java.util.LinkedHashSet;
47
import java.util.List;
48
import java.util.Map;
49
import java.util.Objects;
50
import java.util.Set;
51
import java.util.concurrent.ScheduledExecutorService;
52
import javax.annotation.Nullable;
53

54
/**
55
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
56
 * maintains the watchers for the xds resources and when an update is received, it either requests
57
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
58
 * applies to a single data plane authority.
59
 */
60
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
61
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
62
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
63
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
64
  private final XdsClient xdsClient;
65
  private final XdsConfigWatcher xdsConfigWatcher;
66
  private final SynchronizationContext syncContext;
67
  private final String dataPlaneAuthority;
68

69
  private final InternalLogId logId;
70
  private final XdsLogger logger;
71
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
72
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
73

74
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
75
                       SynchronizationContext syncContext, String dataPlaneAuthority,
76
                       String listenerName, NameResolver.Args nameResolverArgs,
77
                       ScheduledExecutorService scheduler) {
1✔
78
    logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
1✔
79
    logger = XdsLogger.withLogId(logId);
1✔
80
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
81
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
82
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
83
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
84
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
85
    checkNotNull(scheduler, "scheduler");
1✔
86

87
    // start the ball rolling
88
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
89
  }
1✔
90

91
  public static String toContextStr(String typeName, String resourceName) {
92
    return typeName + " resource " + resourceName;
1✔
93
  }
94

95
  @Override
96
  public Closeable subscribeToCluster(String clusterName) {
97
    checkNotNull(clusterName, "clusterName");
1✔
98
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
99

100
    syncContext.execute(() -> {
1✔
101
      if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
102
        subscription.closed = true;
1✔
103
        return; // shutdown() called
1✔
104
      }
105
      addClusterWatcher(clusterName, subscription);
1✔
106
    });
1✔
107

108
    return subscription;
1✔
109
  }
110

111
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
112
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
113
    XdsResourceType<T> type = watcher.type;
1✔
114
    String resourceName = watcher.resourceName;
1✔
115

116
    getWatchers(type).put(resourceName, watcher);
1✔
117
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
118
  }
1✔
119

120
  private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) {
121
    if (watcher == null) {
1✔
122
      return;
×
123
    }
124
    watcher.parentContexts.remove(parentContext);
1✔
125
    if (watcher.parentContexts.isEmpty()) {
1✔
126
      cancelWatcher(watcher);
1✔
127
    }
128
  }
1✔
129

130
  private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) {
131
    if (watcher == null) {
1✔
132
      return;
×
133
    }
134
    watcher.parentContexts.remove(parentContext);
1✔
135
    if (watcher.parentContexts.isEmpty()) {
1✔
136
      cancelWatcher(watcher);
1✔
137
    }
138
  }
1✔
139

140
  private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
141
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
142

143
    if (watcher == null) {
1✔
144
      return;
×
145
    }
146

147
    if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) {
1✔
148
      throwIfParentContextsNotEmpty(watcher);
1✔
149
    }
150

151
    watcher.cancelled = true;
1✔
152
    XdsResourceType<T> type = watcher.type;
1✔
153
    String resourceName = watcher.resourceName;
1✔
154

155
    if (getWatchers(type).remove(resourceName) == null) {
1✔
156
      logger.log(DEBUG, "Trying to cancel watcher {0}, but it isn't watched", watcher);
×
157
      return;
×
158
    }
159

160
    xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);
1✔
161
  }
1✔
162

163
  private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
164
    if (watcher instanceof CdsWatcher) {
1✔
165
      CdsWatcher cdsWatcher = (CdsWatcher) watcher;
1✔
166
      if (!cdsWatcher.parentContexts.isEmpty()) {
1✔
167
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
168
            cdsWatcher.resourceName(), cdsWatcher.parentContexts);
×
169
        throw new IllegalStateException(msg);
×
170
      }
171
    } else if (watcher instanceof EdsWatcher) {
1✔
172
      EdsWatcher edsWatcher = (EdsWatcher) watcher;
1✔
173
      if (!edsWatcher.parentContexts.isEmpty()) {
1✔
174
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
175
            edsWatcher.resourceName(), edsWatcher.parentContexts);
×
176
        throw new IllegalStateException(msg);
×
177
      }
178
    }
179
  }
1✔
180

181
  public void shutdown() {
182
    syncContext.execute(() -> {
1✔
183
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
184
        shutdownWatchersForType(watchers);
1✔
185
      }
1✔
186
      resourceWatchers.clear();
1✔
187
    });
1✔
188
  }
1✔
189

190
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
191
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
192
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
193
          watcherEntry.getValue());
1✔
194
      watcherEntry.getValue().cancelled = true;
1✔
195
    }
1✔
196
  }
1✔
197

198
  private void releaseSubscription(ClusterSubscription subscription) {
199
    checkNotNull(subscription, "subscription");
1✔
200
    String clusterName = subscription.getClusterName();
1✔
201
    syncContext.execute(() -> {
1✔
202
      if (subscription.closed) {
1✔
203
        return;
1✔
204
      }
205
      subscription.closed = true;
1✔
206
      XdsWatcherBase<XdsClusterResource.CdsUpdate> cdsWatcher
1✔
207
          = getWatchers(CLUSTER_RESOURCE).get(clusterName);
1✔
208
      if (cdsWatcher == null) {
1✔
209
        return; // shutdown() called
×
210
      }
211
      cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
1✔
212
      maybePublishConfig();
1✔
213
    });
1✔
214
  }
1✔
215

216
  private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) {
217
    checkNotNull(root, "root");
1✔
218

219
    cancelCdsWatcher(root, parentContext);
1✔
220

221
    if (!root.hasDataValue() || !root.parentContexts.isEmpty()) {
1✔
222
      return;
1✔
223
    }
224

225
    XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue();
1✔
226
    switch (cdsUpdate.clusterType()) {
1✔
227
      case EDS:
228
        String edsServiceName = root.getEdsServiceName();
1✔
229
        EdsWatcher edsWatcher = (EdsWatcher) getWatchers(ENDPOINT_RESOURCE).get(edsServiceName);
1✔
230
        cancelEdsWatcher(edsWatcher, root);
1✔
231
        break;
1✔
232
      case AGGREGATE:
233
        for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
234
          CdsWatcher clusterWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(cluster);
1✔
235
          if (clusterWatcher != null) {
1✔
236
            cancelClusterWatcherTree(clusterWatcher, root);
1✔
237
          }
238
        }
1✔
239
        break;
1✔
240
      case LOGICAL_DNS:
241
        // no eds needed, so everything happens in cancelCdsWatcher()
242
        break;
×
243
      default:
244
        throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType());
×
245
    }
246
  }
1✔
247

248
  /**
249
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
250
   * the watchers.
251
   */
252
  private void maybePublishConfig() {
253
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
254
    if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
255
      return; // shutdown() called
×
256
    }
257
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
258
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
259
        .anyMatch(XdsWatcherBase::missingResult);
1✔
260
    if (waitingOnResource) {
1✔
261
      return;
1✔
262
    }
263

264
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
265
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
266
      return;
1✔
267
    }
268
    assert newUpdate.hasValue()
1✔
269
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
270
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
271
    lastUpdate = newUpdate;
1✔
272
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
273
  }
1✔
274

275
  @VisibleForTesting
276
  StatusOr<XdsConfig> buildUpdate() {
277
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
278

279
    // Iterate watchers and build the XdsConfig
280

281
    // Will only be 1 listener and 1 route resource
282
    RdsUpdateSupplier routeSource = null;
1✔
283
    for (XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher :
284
        getWatchers(XdsListenerResource.getInstance()).values()) {
1✔
285
      if (!ldsWatcher.getData().hasValue()) {
1✔
286
        return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
287
      }
288
      XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
289
      builder.setListener(ldsUpdate);
1✔
290
      routeSource = ((LdsWatcher) ldsWatcher).getRouteSource();
1✔
291
    }
1✔
292

293
    if (routeSource == null) {
1✔
294
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
295
          "Bug: No route source found for listener " + dataPlaneAuthority));
296
    }
297

298
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
299
    if (!statusOrRdsUpdate.hasValue()) {
1✔
300
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
301
    }
302
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
303
    builder.setRoute(rdsUpdate);
1✔
304

305
    VirtualHost activeVirtualHost =
1✔
306
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
307
    if (activeVirtualHost == null) {
1✔
308
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
309
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
310
    }
311
    builder.setVirtualHost(activeVirtualHost);
1✔
312

313
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
314
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
315
    for (String cluster : getWatchers(CLUSTER_RESOURCE).keySet()) {
1✔
316
      addConfigForCluster(clusters, cluster, ancestors);
1✔
317
    }
1✔
318
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
319
      builder.addCluster(me.getKey(), me.getValue());
1✔
320
    }
1✔
321

322
    return StatusOr.fromValue(builder.build());
1✔
323
  }
324

325
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
326
      XdsResourceType<T> resourceType) {
327
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
328
    if (typeWatchers == null) {
1✔
329
      typeWatchers = new TypeWatchers<T>(resourceType);
1✔
330
      resourceWatchers.put(resourceType, typeWatchers);
1✔
331
    }
332
    assert typeWatchers.resourceType == resourceType;
1✔
333
    @SuppressWarnings("unchecked")
334
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
335
    return tTypeWatchers.watchers;
1✔
336
  }
337

338
  private void addConfigForCluster(
339
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
340
      String clusterName,
341
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
342
      LinkedHashSet<String> ancestors) {
343
    if (clusters.containsKey(clusterName)) {
1✔
344
      return;
1✔
345
    }
346
    if (ancestors.contains(clusterName)) {
1✔
347
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
348
          Status.INTERNAL.withDescription(
1✔
349
              "Aggregate cluster cycle detected: " + ancestors)));
350
      return;
1✔
351
    }
352
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
353
      clusters.put(clusterName, StatusOr.fromStatus(
×
354
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
355
      return;
×
356
    }
357

358
    CdsWatcher cdsWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName);
1✔
359
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
360
    if (!cdsWatcherDataOr.hasValue()) {
1✔
361
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
362
      return;
1✔
363
    }
364

365
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
366
    XdsConfig.XdsClusterConfig.ClusterChild child;
367
    switch (cdsUpdate.clusterType()) {
1✔
368
      case AGGREGATE:
369
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
370
        // preserves the priority across all aggregate clusters
371
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
372
        ancestors.add(clusterName);
1✔
373
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
374
          addConfigForCluster(clusters, childCluster, ancestors);
1✔
375
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
376
          if (!config.hasValue()) {
1✔
377
            clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
1✔
378
                "Unable to get leaves for " + clusterName + ": "
379
                + config.getStatus().getDescription())));
1✔
380
            return;
1✔
381
          }
382
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
383
          if (children instanceof AggregateConfig) {
1✔
384
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
385
          } else {
386
            leafNames.add(childCluster);
1✔
387
          }
388
        }
1✔
389
        ancestors.remove(clusterName);
1✔
390

391
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
392
        break;
1✔
393
      case EDS:
394
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
395
            getWatchers(ENDPOINT_RESOURCE).get(cdsWatcher.getEdsServiceName());
1✔
396
        if (edsWatcher != null) {
1✔
397
          child = new EndpointConfig(edsWatcher.getData());
1✔
398
        } else {
399
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
400
              "EDS resource not found for cluster " + clusterName)));
401
        }
402
        break;
×
403
      case LOGICAL_DNS:
404
        // TODO get the resolved endpoint configuration
405
        child = new EndpointConfig(StatusOr.fromStatus(
1✔
406
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
407
        break;
1✔
408
      default:
409
        throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
410
    }
411
    clusters.put(clusterName, StatusOr.fromValue(
1✔
412
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
413
  }
1✔
414

415
  @Override
416
  public String toString() {
417
    return logId.toString();
×
418
  }
419

420
  private void addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
421
    EdsWatcher watcher
1✔
422
        = (EdsWatcher) getWatchers(XdsEndpointResource.getInstance()).get(edsServiceName);
1✔
423
    if (watcher != null) {
1✔
424
      watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence
1✔
425
      return;
1✔
426
    }
427

428
    addWatcher(new EdsWatcher(edsServiceName, parentContext));
1✔
429
  }
1✔
430

431
  private void addClusterWatcher(String clusterName, Object parentContext) {
432
    CdsWatcher watcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName);
1✔
433
    if (watcher != null) {
1✔
434
      watcher.parentContexts.add(parentContext);
1✔
435
      return;
1✔
436
    }
437

438
    addWatcher(new CdsWatcher(clusterName, parentContext));
1✔
439
  }
1✔
440

441
  private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
442
                            List<VirtualHost> oldVirtualHosts, boolean sameParentContext) {
443
    VirtualHost oldVirtualHost =
1✔
444
        RoutingUtils.findVirtualHostForHostName(oldVirtualHosts, dataPlaneAuthority);
1✔
445
    VirtualHost virtualHost =
1✔
446
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
447

448
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
449
    Set<String> oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost);
1✔
450

451
    if (sameParentContext) {
1✔
452
      // Calculate diffs.
453
      Set<String> addedClusters = Sets.difference(newClusters, oldClusters);
1✔
454
      Set<String> deletedClusters = Sets.difference(oldClusters, newClusters);
1✔
455

456
      deletedClusters.forEach(watcher ->
1✔
457
          cancelClusterWatcherTree(getCluster(watcher), newParentContext));
1✔
458
      addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext));
1✔
459
    } else {
1✔
460
      newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext));
1✔
461
    }
462
  }
1✔
463

464
  private String nodeInfo() {
465
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
466
  }
467

468
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
469
    if (virtualHost == null) {
1✔
470
      return Collections.emptySet();
1✔
471
    }
472

473
    // Get all cluster names to which requests can be routed through the virtual host.
474
    Set<String> clusters = new HashSet<>();
1✔
475
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
476
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
477
      if (action == null) {
1✔
478
        continue;
1✔
479
      }
480
      if (action.cluster() != null) {
1✔
481
        clusters.add(action.cluster());
1✔
482
      } else if (action.weightedClusters() != null) {
1✔
483
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
484
          clusters.add(weighedCluster.name());
1✔
485
        }
1✔
486
      }
487
    }
1✔
488

489
    return clusters;
1✔
490
  }
491

492
  private CdsWatcher getCluster(String clusterName) {
493
    return (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName);
1✔
494
  }
495

496
  private static class TypeWatchers<T extends ResourceUpdate> {
497
    // Key is resource name
498
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
499
    final XdsResourceType<T> resourceType;
500

501
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
502
      this.resourceType = resourceType;
1✔
503
    }
1✔
504
  }
505

506
  public interface XdsConfigWatcher {
507
    /**
508
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
509
     * INTERNAL.
510
     */
511
    void onUpdate(StatusOr<XdsConfig> config);
512
  }
513

514
  private final class ClusterSubscription implements Closeable {
515
    private final String clusterName;
516
    boolean closed; // Accessed from syncContext
517

518
    public ClusterSubscription(String clusterName) {
1✔
519
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
520
    }
1✔
521

522
    String getClusterName() {
523
      return clusterName;
1✔
524
    }
525

526
    @Override
527
    public void close() throws IOException {
528
      releaseSubscription(this);
1✔
529
    }
1✔
530
  }
531

532
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
533
      implements ResourceWatcher<T> {
534
    private final XdsResourceType<T> type;
535
    private final String resourceName;
536
    boolean cancelled;
537

538
    @Nullable
539
    private StatusOr<T> data;
540

541

542
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
543
      this.type = checkNotNull(type, "type");
1✔
544
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
545
    }
1✔
546

547
    @Override
548
    public void onError(Status error) {
549
      checkNotNull(error, "error");
1✔
550
      if (cancelled) {
1✔
551
        return;
×
552
      }
553
      // Don't update configuration on error, if we've already received configuration
554
      if (!hasDataValue()) {
1✔
555
        setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
556
            String.format("Error retrieving %s: %s: %s",
1✔
557
              toContextString(), error.getCode(), error.getDescription())));
1✔
558
        maybePublishConfig();
1✔
559
      }
560
    }
1✔
561

562
    @Override
563
    public void onResourceDoesNotExist(String resourceName) {
564
      if (cancelled) {
1✔
565
        return;
1✔
566
      }
567

568
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
569
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
570
          toContextString() + " does not exist" + nodeInfo()));
1✔
571
      maybePublishConfig();
1✔
572
    }
1✔
573

574
    boolean missingResult() {
575
      return data == null;
1✔
576
    }
577

578
    @Nullable
579
    StatusOr<T> getData() {
580
      return data;
1✔
581
    }
582

583
    boolean hasDataValue() {
584
      return data != null && data.hasValue();
1✔
585
    }
586

587
    String resourceName() {
588
      return resourceName;
1✔
589
    }
590

591
    protected void setData(T data) {
592
      checkNotNull(data, "data");
1✔
593
      this.data = StatusOr.fromValue(data);
1✔
594
    }
1✔
595

596
    protected void setDataAsStatus(Status status) {
597
      checkNotNull(status, "status");
1✔
598
      this.data = StatusOr.fromStatus(status);
1✔
599
    }
1✔
600

601
    public String toContextString() {
602
      return toContextStr(type.typeName(), resourceName);
1✔
603
    }
604
  }
605

606
  private interface RdsUpdateSupplier {
607
    StatusOr<RdsUpdate> getRdsUpdate();
608
  }
609

610
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
611
      implements RdsUpdateSupplier {
612
    String rdsName;
613

614
    private LdsWatcher(String resourceName) {
1✔
615
      super(XdsListenerResource.getInstance(), resourceName);
1✔
616
    }
1✔
617

618
    @Override
619
    public void onChanged(XdsListenerResource.LdsUpdate update) {
620
      checkNotNull(update, "update");
1✔
621
      if (cancelled) {
1✔
622
        return;
1✔
623
      }
624

625
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
626
      List<VirtualHost> virtualHosts;
627
      String rdsName;
628
      if (httpConnectionManager == null) {
1✔
629
        // TCP listener. Unsupported config
630
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
631
        rdsName = null;
1✔
632
      } else {
633
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
634
        rdsName = httpConnectionManager.rdsName();
1✔
635
      }
636
      StatusOr<RdsUpdate> activeRdsUpdate = getRouteSource().getRdsUpdate();
1✔
637
      List<VirtualHost> activeVirtualHosts = activeRdsUpdate.hasValue()
1✔
638
          ? activeRdsUpdate.getValue().virtualHosts
1✔
639
          : Collections.emptyList();
1✔
640

641
      boolean changedRdsName = !Objects.equals(rdsName, this.rdsName);
1✔
642
      if (changedRdsName) {
1✔
643
        cleanUpRdsWatcher();
1✔
644
      }
645

646
      if (virtualHosts != null) {
1✔
647
        // No RDS watcher since we are getting RDS updates via LDS
648
        updateRoutes(virtualHosts, this, activeVirtualHosts, this.rdsName == null);
1✔
649
        this.rdsName = null;
1✔
650
      } else if (changedRdsName) {
1✔
651
        this.rdsName = rdsName;
1✔
652
        addWatcher(new RdsWatcher(rdsName));
1✔
653
        logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
654
      }
655

656
      setData(update);
1✔
657
      maybePublishConfig();
1✔
658
    }
1✔
659

660
    @Override
661
    public void onResourceDoesNotExist(String resourceName) {
662
      if (cancelled) {
1✔
663
        return;
×
664
      }
665

666
      checkArgument(resourceName().equals(resourceName), "Resource name does not match");
1✔
667
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
668
          toContextString() + " does not exist" + nodeInfo()));
1✔
669
      cleanUpRdsWatcher();
1✔
670
      rdsName = null;
1✔
671
      maybePublishConfig();
1✔
672
    }
1✔
673

674
    private void cleanUpRdsWatcher() {
675
      RdsWatcher oldRdsWatcher = getRdsWatcher();
1✔
676
      if (oldRdsWatcher != null) {
1✔
677
        cancelWatcher(oldRdsWatcher);
1✔
678
        logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName);
1✔
679

680
        // Cleanup clusters (as appropriate) that had the old rds watcher as a parent
681
        if (!oldRdsWatcher.hasDataValue()) {
1✔
682
          return;
×
683
        }
684
        for (XdsWatcherBase<XdsClusterResource.CdsUpdate> watcher :
685
            getWatchers(CLUSTER_RESOURCE).values()) {
1✔
686
          cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher);
1✔
687
        }
1✔
688
      }
689
    }
1✔
690

691
    private RdsWatcher getRdsWatcher() {
692
      if (rdsName == null) {
1✔
693
        return null;
1✔
694
      }
695
      return (RdsWatcher) getWatchers(XdsRouteConfigureResource.getInstance()).get(rdsName);
1✔
696
    }
697

698
    public RdsUpdateSupplier getRouteSource() {
699
      if (!hasDataValue()) {
1✔
700
        return this;
1✔
701
      }
702
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
703
      if (hcm == null) {
1✔
704
        return this;
1✔
705
      }
706
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
707
      if (virtualHosts != null) {
1✔
708
        return this;
1✔
709
      }
710
      RdsWatcher rdsWatcher = getRdsWatcher();
1✔
711
      assert rdsWatcher != null;
1✔
712
      return rdsWatcher;
1✔
713
    }
714

715
    @Override
716
    public StatusOr<RdsUpdate> getRdsUpdate() {
717
      if (missingResult()) {
1✔
718
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
1✔
719
      }
720
      if (!getData().hasValue()) {
1✔
721
        return StatusOr.fromStatus(getData().getStatus());
1✔
722
      }
723
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
724
      if (hcm == null) {
1✔
725
        return StatusOr.fromStatus(
1✔
726
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
727
      }
728
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
729
      if (virtualHosts == null) {
1✔
730
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
731
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
732
        // bug
733
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
734
      }
735
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
736
    }
737
  }
738

739
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
740

741
    public RdsWatcher(String resourceName) {
1✔
742
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
743
    }
1✔
744

745
    @Override
746
    public void onChanged(RdsUpdate update) {
747
      checkNotNull(update, "update");
1✔
748
      if (cancelled) {
1✔
749
        return;
1✔
750
      }
751
      List<VirtualHost> oldVirtualHosts = hasDataValue()
1✔
752
          ? getData().getValue().virtualHosts
1✔
753
          : Collections.emptyList();
1✔
754
      setData(update);
1✔
755
      updateRoutes(update.virtualHosts, this, oldVirtualHosts, true);
1✔
756
      maybePublishConfig();
1✔
757
    }
1✔
758

759
    @Override
760
    public StatusOr<RdsUpdate> getRdsUpdate() {
761
      if (missingResult()) {
1✔
762
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
763
      }
764
      return getData();
1✔
765
    }
766
  }
767

768
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
769
    Set<Object> parentContexts = new HashSet<>();
1✔
770

771
    CdsWatcher(String resourceName, Object parentContext) {
1✔
772
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
773
      this.parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
774
    }
1✔
775

776
    @Override
777
    public void onChanged(XdsClusterResource.CdsUpdate update) {
778
      checkNotNull(update, "update");
1✔
779
      if (cancelled) {
1✔
780
        return;
1✔
781
      }
782
      switch (update.clusterType()) {
1✔
783
        case EDS:
784
          setData(update);
1✔
785
          addEdsWatcher(getEdsServiceName(), this);
1✔
786
          break;
1✔
787
        case LOGICAL_DNS:
788
          setData(update);
1✔
789
          // no eds needed
790
          break;
1✔
791
        case AGGREGATE:
792
          Object parentContext = this;
1✔
793
          if (hasDataValue()) {
1✔
794
            Set<String> oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE
1✔
795
                ? new HashSet<>(getData().getValue().prioritizedClusterNames())
1✔
796
                : Collections.emptySet();
1✔
797
            Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
1✔
798

799
            Set<String> deletedClusters = Sets.difference(oldNames, newNames);
1✔
800
            deletedClusters.forEach((cluster)
1✔
801
                -> cancelClusterWatcherTree(getCluster(cluster), parentContext));
1✔
802

803
            setData(update);
1✔
804
            Set<String> addedClusters = Sets.difference(newNames, oldNames);
1✔
805
            addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext));
1✔
806
          } else {
1✔
807
            setData(update);
1✔
808
            update.prioritizedClusterNames()
1✔
809
                .forEach(name -> addClusterWatcher(name, parentContext));
1✔
810
          }
811
          break;
1✔
812
        default:
813
          Status error = Status.UNAVAILABLE.withDescription(
×
814
              "unknown cluster type in " + resourceName() + " " + update.clusterType());
×
815
          setDataAsStatus(error);
×
816
      }
817
      maybePublishConfig();
1✔
818
    }
1✔
819

820
    public String getEdsServiceName() {
821
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
822
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
823
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
824
      if (edsServiceName == null) {
1✔
825
        edsServiceName = cdsUpdate.clusterName();
×
826
      }
827
      return edsServiceName;
1✔
828
    }
829
  }
830

831
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
832
    private final Set<CdsWatcher> parentContexts = new HashSet<>();
1✔
833

834
    private EdsWatcher(String resourceName, CdsWatcher parentContext) {
1✔
835
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
836
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
837
    }
1✔
838

839
    @Override
840
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
841
      if (cancelled) {
1✔
842
        return;
1✔
843
      }
844
      setData(checkNotNull(update, "update"));
1✔
845
      maybePublishConfig();
1✔
846
    }
1✔
847

848
    void addParentContext(CdsWatcher parentContext) {
849
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
850
    }
1✔
851
  }
852
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc