• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19687

11 Feb 2025 10:34PM CUT coverage: 88.605% (+0.02%) from 88.587%
#19687

push

github

web-flow
xds: Cleanup by moving methods in XdsDependencyManager ahead of classes (#11890)

* Move private methods ahead of classes

34245 of 38649 relevant lines covered (88.61%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.39
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.Sets;
27
import io.grpc.InternalLogId;
28
import io.grpc.Status;
29
import io.grpc.StatusOr;
30
import io.grpc.SynchronizationContext;
31
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
32
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
33
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
34
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
35
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
36
import io.grpc.xds.client.XdsClient;
37
import io.grpc.xds.client.XdsClient.ResourceWatcher;
38
import io.grpc.xds.client.XdsLogger;
39
import io.grpc.xds.client.XdsResourceType;
40
import java.io.Closeable;
41
import java.io.IOException;
42
import java.util.ArrayList;
43
import java.util.Collections;
44
import java.util.HashMap;
45
import java.util.HashSet;
46
import java.util.List;
47
import java.util.Map;
48
import java.util.Objects;
49
import java.util.Set;
50
import java.util.stream.Collectors;
51
import javax.annotation.Nullable;
52

53
/**
54
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
55
 * maintains the watchers for the xds resources and when an update is received, it either requests
56
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
57
 * applies to a single data plane authority.
58
 */
59
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
60
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
61
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
62
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
63
  private final XdsClient xdsClient;
64
  private final XdsConfigWatcher xdsConfigWatcher;
65
  private final SynchronizationContext syncContext;
66
  private final String dataPlaneAuthority;
67

68
  private final InternalLogId logId;
69
  private final XdsLogger logger;
70
  private XdsConfig lastXdsConfig = null;
1✔
71
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
72

73
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
74
                       SynchronizationContext syncContext, String dataPlaneAuthority,
75
                       String listenerName) {
1✔
76
    logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
1✔
77
    logger = XdsLogger.withLogId(logId);
1✔
78
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
79
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
80
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
81
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
82

83
    // start the ball rolling
84
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
85
  }
1✔
86

87
  public static String  toContextStr(String typeName, String resourceName) {
88
    return typeName + " resource: " + resourceName;
1✔
89
  }
90

91
  @Override
92
  public Closeable subscribeToCluster(String clusterName) {
93

94
    checkNotNull(clusterName, "clusterName");
1✔
95
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
96

97
    syncContext.execute(() -> {
1✔
98
      addClusterWatcher(clusterName, subscription, 1);
1✔
99
      maybePublishConfig();
1✔
100
    });
1✔
101

102
    return subscription;
1✔
103
  }
104

105
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
106
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
107
    XdsResourceType<T> type = watcher.type;
1✔
108
    String resourceName = watcher.resourceName;
1✔
109

110
    @SuppressWarnings("unchecked")
111
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
112
    if (typeWatchers == null) {
1✔
113
      typeWatchers = new TypeWatchers<>(type);
1✔
114
      resourceWatchers.put(type, typeWatchers);
1✔
115
    }
116

117
    typeWatchers.add(resourceName, watcher);
1✔
118
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
119
  }
1✔
120

121
  private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) {
122
    if (watcher == null) {
1✔
123
      return;
×
124
    }
125
    watcher.parentContexts.remove(parentContext);
1✔
126
    if (watcher.parentContexts.isEmpty()) {
1✔
127
      cancelWatcher(watcher);
1✔
128
    }
129
  }
1✔
130

131
  private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) {
132
    if (watcher == null) {
1✔
133
      return;
×
134
    }
135
    watcher.parentContexts.remove(parentContext);
1✔
136
    if (watcher.parentContexts.isEmpty()) {
1✔
137
      cancelWatcher(watcher);
1✔
138
    }
139
  }
1✔
140

141
  private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
142
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
143

144
    if (watcher == null) {
1✔
145
      return;
×
146
    }
147

148
    if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) {
1✔
149
      throwIfParentContextsNotEmpty(watcher);
1✔
150
    }
151

152
    XdsResourceType<T> type = watcher.type;
1✔
153
    String resourceName = watcher.resourceName;
1✔
154

155
    @SuppressWarnings("unchecked")
156
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
157
    if (typeWatchers == null) {
1✔
158
      logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher);
×
159
      return;
×
160
    }
161

162
    typeWatchers.watchers.remove(resourceName);
1✔
163
    xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);
1✔
164

165
  }
1✔
166

167
  private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
168
    if (watcher instanceof CdsWatcher) {
1✔
169
      CdsWatcher cdsWatcher = (CdsWatcher) watcher;
1✔
170
      if (!cdsWatcher.parentContexts.isEmpty()) {
1✔
171
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
172
            cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet());
×
173
        throw new IllegalStateException(msg);
×
174
      }
175
    } else if (watcher instanceof EdsWatcher) {
1✔
176
      EdsWatcher edsWatcher = (EdsWatcher) watcher;
1✔
177
      if (!edsWatcher.parentContexts.isEmpty()) {
1✔
178
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
179
            edsWatcher.resourceName(), edsWatcher.parentContexts);
×
180
        throw new IllegalStateException(msg);
×
181
      }
182
    }
183
  }
1✔
184

185
  public void shutdown() {
186
    syncContext.execute(() -> {
1✔
187
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
188
        shutdownWatchersForType(watchers);
1✔
189
      }
1✔
190
      resourceWatchers.clear();
1✔
191
    });
1✔
192
  }
1✔
193

194
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
195
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
196
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
197
          watcherEntry.getValue());
1✔
198
    }
1✔
199
  }
1✔
200

201
  private void releaseSubscription(ClusterSubscription subscription) {
202
    checkNotNull(subscription, "subscription");
1✔
203
    String clusterName = subscription.getClusterName();
1✔
204
    syncContext.execute(() -> {
1✔
205
      XdsWatcherBase<?> cdsWatcher =
1✔
206
          resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
207
      if (cdsWatcher == null) {
1✔
208
        return; // already released while waiting for the syncContext
×
209
      }
210
      cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
1✔
211
      maybePublishConfig();
1✔
212
    });
1✔
213
  }
1✔
214

215
  private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) {
216
    checkNotNull(root, "root");
1✔
217

218
    cancelCdsWatcher(root, parentContext);
1✔
219

220
    if (!root.hasDataValue() || !root.parentContexts.isEmpty()) {
1✔
221
      return;
1✔
222
    }
223

224
    XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue();
1✔
225
    switch (cdsUpdate.clusterType()) {
1✔
226
      case EDS:
227
        String edsServiceName = cdsUpdate.edsServiceName();
1✔
228
        EdsWatcher edsWatcher =
1✔
229
            (EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName);
1✔
230
        cancelEdsWatcher(edsWatcher, root);
1✔
231
        break;
1✔
232
      case AGGREGATE:
233
        for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
234
          CdsWatcher clusterWatcher =
1✔
235
              (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(cluster);
1✔
236
          if (clusterWatcher != null) {
1✔
237
            cancelClusterWatcherTree(clusterWatcher, root);
1✔
238
          }
239
        }
1✔
240
        break;
1✔
241
      case LOGICAL_DNS:
242
        // no eds needed
243
        break;
×
244
      default:
245
        throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType());
×
246
    }
247
  }
1✔
248

249
  /**
250
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
251
   * the watchers.
252
   */
253
  private void maybePublishConfig() {
254
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
255
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
256
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
257
        .anyMatch(XdsWatcherBase::missingResult);
1✔
258
    if (waitingOnResource) {
1✔
259
      return;
1✔
260
    }
261

262
    XdsConfig newConfig = buildConfig();
1✔
263
    if (Objects.equals(newConfig, lastXdsConfig)) {
1✔
264
      return;
1✔
265
    }
266
    lastXdsConfig = newConfig;
1✔
267
    xdsConfigWatcher.onUpdate(lastXdsConfig);
1✔
268
  }
1✔
269

270
  @VisibleForTesting
271
  XdsConfig buildConfig() {
272
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
273

274
    // Iterate watchers and build the XdsConfig
275

276
    // Will only be 1 listener and 1 route resource
277
    VirtualHost activeVirtualHost = getActiveVirtualHost();
1✔
278
    for (XdsWatcherBase<?> xdsWatcherBase :
279
        resourceWatchers.get(XdsListenerResource.getInstance()).watchers.values()) {
1✔
280
      XdsListenerResource.LdsUpdate ldsUpdate = ((LdsWatcher) xdsWatcherBase).getData().getValue();
1✔
281
      builder.setListener(ldsUpdate);
1✔
282
      if (activeVirtualHost == null) {
1✔
283
        activeVirtualHost = RoutingUtils.findVirtualHostForHostName(
1✔
284
            ldsUpdate.httpConnectionManager().virtualHosts(), dataPlaneAuthority);
1✔
285
      }
286

287
      if (ldsUpdate.httpConnectionManager() != null
1✔
288
          && ldsUpdate.httpConnectionManager().virtualHosts() != null) {
1✔
289
        RdsUpdate rdsUpdate = new RdsUpdate(ldsUpdate.httpConnectionManager().virtualHosts());
1✔
290
        builder.setRoute(rdsUpdate);
1✔
291
      }
292
    }
1✔
293

294
    resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
1✔
295
        .map(watcher -> (RdsWatcher) watcher)
1✔
296
        .forEach(watcher -> builder.setRoute(watcher.getData().getValue()));
1✔
297

298
    builder.setVirtualHost(activeVirtualHost);
1✔
299

300
    Map<String, ? extends XdsWatcherBase<?>> edsWatchers =
1✔
301
        resourceWatchers.get(ENDPOINT_RESOURCE).watchers;
1✔
302
    Map<String, ? extends XdsWatcherBase<?>> cdsWatchers =
1✔
303
        resourceWatchers.get(CLUSTER_RESOURCE).watchers;
1✔
304

305
    // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
306
    List<String> topLevelClusters =
1✔
307
        cdsWatchers.values().stream()
1✔
308
            .filter(XdsDependencyManager::isTopLevelCluster)
1✔
309
            .map(w -> w.resourceName())
1✔
310
            .collect(Collectors.toList());
1✔
311

312
    // Flatten multi-level aggregates into lists of leaf clusters
313
    Set<String> leafNames =
1✔
314
        addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters);
1✔
315

316
    addLeavesToBuilder(builder, edsWatchers, leafNames);
1✔
317

318
    return builder.build();
1✔
319
  }
320

321
  private void addLeavesToBuilder(XdsConfig.XdsConfigBuilder builder,
322
                                  Map<String, ? extends XdsWatcherBase<?>> edsWatchers,
323
                                  Set<String> leafNames) {
324
    for (String clusterName : leafNames) {
1✔
325
      CdsWatcher cdsWatcher = getCluster(clusterName);
1✔
326
      StatusOr<XdsClusterResource.CdsUpdate> cdsUpdateOr = cdsWatcher.getData();
1✔
327

328
      if (cdsUpdateOr.hasValue()) {
1✔
329
        XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue();
1✔
330
        if (cdsUpdate.clusterType() == ClusterType.EDS) {
1✔
331
          EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName());
1✔
332
          if (edsWatcher != null) {
1✔
333
            EndpointConfig child = new EndpointConfig(edsWatcher.getData());
1✔
334
            builder.addCluster(clusterName, StatusOr.fromValue(
1✔
335
                new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
336
          } else {
1✔
337
            builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
338
                "EDS resource not found for cluster " + clusterName)));
339
          }
340
        } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) {
1✔
341
          // TODO get the resolved endpoint configuration
342
          builder.addCluster(clusterName, StatusOr.fromValue(
×
343
              new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, new EndpointConfig(null))));
344
        }
345
      } else {
1✔
346
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus()));
1✔
347
      }
348
    }
1✔
349
  }
1✔
350

351
  // Adds the top-level clusters to the builder and returns the leaf cluster names
352
  private Set<String> addTopLevelClustersToBuilder(
353
      XdsConfig.XdsConfigBuilder builder, Map<String, ? extends XdsWatcherBase<?>> edsWatchers,
354
      Map<String, ? extends XdsWatcherBase<?>> cdsWatchers, List<String> topLevelClusters) {
355

356
    Set<String> leafClusterNames = new HashSet<>();
1✔
357
    for (String clusterName : topLevelClusters) {
1✔
358
      CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName);
1✔
359
      StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
360
      if (!cdsWatcher.hasDataValue()) {
1✔
361
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
362
        continue;
1✔
363
      }
364

365
      XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
366
      XdsConfig.XdsClusterConfig.ClusterChild child;
367
      switch (cdsUpdate.clusterType()) {
1✔
368
        case AGGREGATE:
369
          List<String> leafNames = getLeafNames(cdsUpdate);
1✔
370
          child = new AggregateConfig(leafNames);
1✔
371
          leafClusterNames.addAll(leafNames);
1✔
372
          break;
1✔
373
        case EDS:
374
          EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName());
1✔
375
          if (edsWatcher != null) {
1✔
376
            child = new EndpointConfig(edsWatcher.getData());
1✔
377
          } else {
378
            builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
379
                "EDS resource not found for cluster " + clusterName)));
380
            continue;
×
381
          }
382
          break;
383
        case LOGICAL_DNS:
384
          // TODO get the resolved endpoint configuration
385
          child = new EndpointConfig(null);
×
386
          break;
×
387
        default:
388
          throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
389
      }
390
      builder.addCluster(clusterName, StatusOr.fromValue(
1✔
391
          new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
392
    }
1✔
393

394
    return leafClusterNames;
1✔
395
  }
396

397
  private List<String> getLeafNames(XdsClusterResource.CdsUpdate cdsUpdate) {
398
    List<String> childNames = new ArrayList<>();
1✔
399

400
    for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
401
      StatusOr<XdsClusterResource.CdsUpdate> data = getCluster(cluster).getData();
1✔
402
      if (data == null || !data.hasValue() || data.getValue() == null) {
1✔
403
        childNames.add(cluster);
1✔
404
        continue;
1✔
405
      }
406
      if (data.getValue().clusterType() == ClusterType.AGGREGATE) {
1✔
407
        childNames.addAll(getLeafNames(data.getValue()));
1✔
408
      } else {
409
        childNames.add(cluster);
1✔
410
      }
411
    }
1✔
412

413
    return childNames;
1✔
414
  }
415

416
  private static boolean isTopLevelCluster(XdsWatcherBase<?> cdsWatcher) {
417
    if (! (cdsWatcher instanceof CdsWatcher)) {
1✔
418
      return false;
×
419
    }
420
    return ((CdsWatcher)cdsWatcher).parentContexts.values().stream()
1✔
421
        .anyMatch(depth -> depth == 1);
1✔
422
  }
423

424
  @Override
425
  public String toString() {
426
    return logId.toString();
×
427
  }
428

429
  // Returns true if the watcher was added, false if it already exists
430
  private boolean addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
431
    TypeWatchers<?> typeWatchers = resourceWatchers.get(XdsEndpointResource.getInstance());
1✔
432
    if (typeWatchers == null || !typeWatchers.watchers.containsKey(edsServiceName)) {
1✔
433
      addWatcher(new EdsWatcher(edsServiceName, parentContext));
1✔
434
      return true;
1✔
435
    }
436

437
    EdsWatcher watcher = (EdsWatcher) typeWatchers.watchers.get(edsServiceName);
1✔
438
    watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence
1✔
439
    return false;
1✔
440
  }
441

442
  private void addClusterWatcher(String clusterName, Object parentContext, int depth) {
443
    TypeWatchers<?> clusterWatchers = resourceWatchers.get(CLUSTER_RESOURCE);
1✔
444
    if (clusterWatchers != null) {
1✔
445
      CdsWatcher watcher = (CdsWatcher) clusterWatchers.watchers.get(clusterName);
1✔
446
      if (watcher != null) {
1✔
447
        watcher.parentContexts.put(parentContext, depth);
1✔
448
        return;
1✔
449
      }
450
    }
451

452
    addWatcher(new CdsWatcher(clusterName, parentContext, depth));
1✔
453
  }
1✔
454

455
  private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
456
                            VirtualHost oldVirtualHost, boolean sameParentContext) {
457
    VirtualHost virtualHost =
1✔
458
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
459
    if (virtualHost == null) {
1✔
460
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
461
      logger.log(XdsLogger.XdsLogLevel.WARNING, error);
1✔
462
      cleanUpRoutes();
1✔
463
      xdsConfigWatcher.onError(
1✔
464
          "xDS node ID:" + dataPlaneAuthority, Status.UNAVAILABLE.withDescription(error));
1✔
465
      return;
1✔
466
    }
467

468
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
469
    Set<String> oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost);
1✔
470

471
    if (sameParentContext) {
1✔
472
      // Calculate diffs.
473
      Set<String> addedClusters = Sets.difference(newClusters, oldClusters);
1✔
474
      Set<String> deletedClusters = Sets.difference(oldClusters, newClusters);
1✔
475

476
      deletedClusters.forEach(watcher ->
1✔
477
          cancelClusterWatcherTree(getCluster(watcher), newParentContext));
1✔
478
      addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
479
    } else {
1✔
480
      newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
481
    }
482
  }
1✔
483

484
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
485
    if (virtualHost == null) {
1✔
486
      return Collections.emptySet();
1✔
487
    }
488

489
    // Get all cluster names to which requests can be routed through the virtual host.
490
    Set<String> clusters = new HashSet<>();
1✔
491
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
492
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
493
      if (action == null) {
1✔
494
        continue;
×
495
      }
496
      if (action.cluster() != null) {
1✔
497
        clusters.add(action.cluster());
1✔
498
      } else if (action.weightedClusters() != null) {
×
499
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
×
500
          clusters.add(weighedCluster.name());
×
501
        }
×
502
      }
503
    }
1✔
504

505
    return clusters;
1✔
506
  }
507

508
  @Nullable
509
  private VirtualHost getActiveVirtualHost() {
510
    TypeWatchers<?> rdsWatchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
1✔
511
    if (rdsWatchers == null) {
1✔
512
      return null;
1✔
513
    }
514

515
    RdsWatcher activeRdsWatcher =
1✔
516
        (RdsWatcher) rdsWatchers.watchers.values().stream().findFirst().orElse(null);
1✔
517
    if (activeRdsWatcher == null || activeRdsWatcher.missingResult()
1✔
518
        || !activeRdsWatcher.getData().hasValue()) {
1✔
519
      return null;
1✔
520
    }
521

522
    return RoutingUtils.findVirtualHostForHostName(
1✔
523
        activeRdsWatcher.getData().getValue().virtualHosts, dataPlaneAuthority);
1✔
524
  }
525

526
  // Must be in SyncContext
527
  private void cleanUpRoutes() {
528
    // Remove RdsWatcher & CDS Watchers
529
    TypeWatchers<?> rdsResourceWatcher =
1✔
530
        resourceWatchers.get(XdsRouteConfigureResource.getInstance());
1✔
531
    if (rdsResourceWatcher == null || rdsResourceWatcher.watchers.isEmpty()) {
1✔
532
      return;
1✔
533
    }
534

535
    XdsWatcherBase<?> watcher = rdsResourceWatcher.watchers.values().stream().findFirst().get();
×
536
    cancelWatcher(watcher);
×
537

538
    // Remove CdsWatchers pointed to by the RdsWatcher
539
    RdsWatcher rdsWatcher = (RdsWatcher) watcher;
×
540
    for (String cName : rdsWatcher.getCdsNames()) {
×
541
      CdsWatcher cdsWatcher = getCluster(cName);
×
542
      if (cdsWatcher != null) {
×
543
        cancelClusterWatcherTree(cdsWatcher, rdsWatcher);
×
544
      }
545
    }
×
546
  }
×
547

548
  private CdsWatcher getCluster(String clusterName) {
549
    return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
550
  }
551

552
  private static class TypeWatchers<T extends ResourceUpdate> {
553
    // Key is resource name
554
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
555
    final XdsResourceType<T> resourceType;
556

557
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
558
      this.resourceType = resourceType;
1✔
559
    }
1✔
560

561
    public void add(String resourceName, XdsWatcherBase<T> watcher) {
562
      watchers.put(resourceName, watcher);
1✔
563
    }
1✔
564
  }
565

566
  public interface XdsConfigWatcher {
567

568
    void onUpdate(XdsConfig config);
569

570
    // These 2 methods are invoked when there is an error or
571
    // does-not-exist on LDS or RDS only.  The context will be a
572
    // human-readable string indicating the scope in which the error
573
    // occurred (e.g., the resource type and name).
574
    void onError(String resourceContext, Status status);
575

576
    void onResourceDoesNotExist(String resourceContext);
577
  }
578

579
  private class ClusterSubscription implements Closeable {
580
    String clusterName;
581

582
    public ClusterSubscription(String clusterName) {
1✔
583
      this.clusterName = clusterName;
1✔
584
    }
1✔
585

586
    public String getClusterName() {
587
      return clusterName;
1✔
588
    }
589

590
    @Override
591
    public void close() throws IOException {
592
      releaseSubscription(this);
1✔
593
    }
1✔
594
  }
595

596
  private abstract static class XdsWatcherBase<T extends ResourceUpdate>
597
      implements ResourceWatcher<T> {
598
    private final XdsResourceType<T> type;
599
    private final String resourceName;
600
    @Nullable
601
    private StatusOr<T> data;
602

603

604
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
605
      this.type = checkNotNull(type, "type");
1✔
606
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
607
    }
1✔
608

609
    @Override
610
    public void onError(Status error) {
611
      checkNotNull(error, "error");
1✔
612
      setDataAsStatus(error);
1✔
613
    }
1✔
614

615
    protected void handleDoesNotExist(String resourceName) {
616
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
617
      setDataAsStatus(Status.UNAVAILABLE.withDescription("No " + toContextString()));
1✔
618
    }
1✔
619

620
    boolean missingResult() {
621
      return data == null;
1✔
622
    }
623

624
    @Nullable
625
    StatusOr<T> getData() {
626
      return data;
1✔
627
    }
628

629
    boolean hasDataValue() {
630
      return data != null && data.hasValue();
1✔
631
    }
632

633
    String resourceName() {
634
      return resourceName;
1✔
635
    }
636

637
    protected void setData(T data) {
638
      checkNotNull(data, "data");
1✔
639
      this.data = StatusOr.fromValue(data);
1✔
640
    }
1✔
641

642
    protected void setDataAsStatus(Status status) {
643
      checkNotNull(status, "status");
1✔
644
      this.data = StatusOr.fromStatus(status);
1✔
645
    }
1✔
646

647
    String toContextString() {
648
      return toContextStr(type.typeName(), resourceName);
1✔
649
    }
650
  }
651

652
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate> {
653
    String rdsName;
654

655
    private LdsWatcher(String resourceName) {
1✔
656
      super(XdsListenerResource.getInstance(), resourceName);
1✔
657
    }
1✔
658

659
    @Override
660
    public void onChanged(XdsListenerResource.LdsUpdate update) {
661
      checkNotNull(update, "update");
1✔
662

663
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
664
      List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
1✔
665
      String rdsName = httpConnectionManager.rdsName();
1✔
666
      VirtualHost activeVirtualHost = getActiveVirtualHost();
1✔
667

668
      boolean changedRdsName = !Objects.equals(rdsName, this.rdsName);
1✔
669
      if (changedRdsName) {
1✔
670
        cleanUpRdsWatcher();
1✔
671
      }
672

673
      if (virtualHosts != null) {
1✔
674
        // No RDS watcher since we are getting RDS updates via LDS
675
        updateRoutes(virtualHosts, this, activeVirtualHost, this.rdsName == null);
1✔
676
        this.rdsName = null;
1✔
677
      } else if (changedRdsName) {
1✔
678
        cleanUpRdsWatcher();
1✔
679
        this.rdsName = rdsName;
1✔
680
        addWatcher(new RdsWatcher(rdsName));
1✔
681
        logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
682
      }
683

684
      setData(update);
1✔
685
      maybePublishConfig();
1✔
686
    }
1✔
687

688
    @Override
689
    public void onError(Status error) {
690
      super.onError(checkNotNull(error, "error"));
1✔
691
      xdsConfigWatcher.onError(toContextString(), error);
1✔
692
    }
1✔
693

694
    @Override
695
    public void onResourceDoesNotExist(String resourceName) {
696
      handleDoesNotExist(resourceName);
1✔
697
      xdsConfigWatcher.onResourceDoesNotExist(toContextString());
1✔
698
    }
1✔
699

700
    private void cleanUpRdsWatcher() {
701
      RdsWatcher oldRdsWatcher = getRdsWatcher();
1✔
702
      if (oldRdsWatcher != null) {
1✔
703
        cancelWatcher(oldRdsWatcher);
1✔
704
        logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName);
1✔
705

706
        // Cleanup clusters (as appropriate) that had the old rds watcher as a parent
707
        if (!oldRdsWatcher.hasDataValue() || !oldRdsWatcher.getData().hasValue()
1✔
708
            || resourceWatchers.get(CLUSTER_RESOURCE) == null) {
1✔
709
          return;
×
710
        }
711
        for (XdsWatcherBase<?> watcher :
712
            resourceWatchers.get(CLUSTER_RESOURCE).watchers.values()) {
1✔
713
          cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher);
1✔
714
        }
1✔
715
      }
716
    }
1✔
717

718
    private RdsWatcher getRdsWatcher() {
719
      TypeWatchers<?> watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
1✔
720
      if (watchers == null || rdsName == null || watchers.watchers.isEmpty()) {
1✔
721
        return null;
1✔
722
      }
723

724
      return (RdsWatcher) watchers.watchers.get(rdsName);
1✔
725
    }
726
  }
727

728
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> {
729

730
    public RdsWatcher(String resourceName) {
1✔
731
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
732
    }
1✔
733

734
    @Override
735
    public void onChanged(RdsUpdate update) {
736
      checkNotNull(update, "update");
1✔
737
      RdsUpdate oldData = hasDataValue() ? getData().getValue() : null;
1✔
738
      VirtualHost oldVirtualHost =
739
          (oldData != null)
1✔
740
          ? RoutingUtils.findVirtualHostForHostName(oldData.virtualHosts, dataPlaneAuthority)
1✔
741
          : null;
1✔
742
      setData(update);
1✔
743
      updateRoutes(update.virtualHosts, this, oldVirtualHost, true);
1✔
744
      maybePublishConfig();
1✔
745
    }
1✔
746

747
    @Override
748
    public void onError(Status error) {
749
      super.onError(checkNotNull(error, "error"));
×
750
      xdsConfigWatcher.onError(toContextString(), error);
×
751
    }
×
752

753
    @Override
754
    public void onResourceDoesNotExist(String resourceName) {
755
      handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
1✔
756
      xdsConfigWatcher.onResourceDoesNotExist(toContextString());
1✔
757
    }
1✔
758

759
    ImmutableList<String> getCdsNames() {
760
      if (!hasDataValue() || getData().getValue().virtualHosts == null) {
×
761
        return ImmutableList.of();
×
762
      }
763

764
      return ImmutableList.copyOf(getClusterNamesFromVirtualHost(getActiveVirtualHost()));
×
765
    }
766
  }
767

768
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
769
    Map<Object, Integer> parentContexts = new HashMap<>();
1✔
770

771
    CdsWatcher(String resourceName, Object parentContext, int depth) {
1✔
772
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
773
      this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth);
1✔
774
    }
1✔
775

776
    @Override
777
    public void onChanged(XdsClusterResource.CdsUpdate update) {
778
      checkNotNull(update, "update");
1✔
779
      switch (update.clusterType()) {
1✔
780
        case EDS:
781
          setData(update);
1✔
782
          if (!addEdsWatcher(update.edsServiceName(), this))  {
1✔
783
            maybePublishConfig();
1✔
784
          }
785
          break;
786
        case LOGICAL_DNS:
787
          setData(update);
×
788
          maybePublishConfig();
×
789
          // no eds needed
790
          break;
×
791
        case AGGREGATE:
792
          Object parentContext = this;
1✔
793
          int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1;
1✔
794
          if (depth > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
795
            logger.log(XdsLogger.XdsLogLevel.WARNING,
×
796
                "Cluster recursion depth limit exceeded for cluster {0}", resourceName());
×
797
            Status error = Status.UNAVAILABLE.withDescription(
×
798
                "aggregate cluster graph exceeds max depth");
799
            setDataAsStatus(error);
×
800
          }
801
          if (hasDataValue()) {
1✔
802
            Set<String> oldNames = new HashSet<>(getData().getValue().prioritizedClusterNames());
1✔
803
            Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
1✔
804

805

806
            Set<String> deletedClusters = Sets.difference(oldNames, newNames);
1✔
807
            deletedClusters.forEach((cluster)
1✔
808
                -> cancelClusterWatcherTree(getCluster(cluster), parentContext));
1✔
809

810
            if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
811
              setData(update);
1✔
812
              Set<String> addedClusters = Sets.difference(newNames, oldNames);
1✔
813
              addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth));
1✔
814

815
              if (addedClusters.isEmpty()) {
1✔
816
                maybePublishConfig();
×
817
              }
818
            } else { // data was set to error status above
1✔
819
              maybePublishConfig();
×
820
            }
821

822
          } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
823
            setData(update);
1✔
824
            update.prioritizedClusterNames()
1✔
825
                .forEach(name -> addClusterWatcher(name, parentContext, depth));
1✔
826
            maybePublishConfig();
1✔
827
          }
828
          break;
829
        default:
830
          Status error = Status.UNAVAILABLE.withDescription(
×
831
              "aggregate cluster graph exceeds max depth");
832
          setDataAsStatus(error);
×
833
          maybePublishConfig();
×
834
      }
835
    }
1✔
836

837
    @Override
838
    public void onResourceDoesNotExist(String resourceName) {
839
      handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
1✔
840
      maybePublishConfig();
1✔
841
    }
1✔
842
  }
843

844
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
845
    private final Set<CdsWatcher> parentContexts = new HashSet<>();
1✔
846

847
    private EdsWatcher(String resourceName, CdsWatcher parentContext) {
1✔
848
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
849
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
850
    }
1✔
851

852
    @Override
853
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
854
      setData(checkNotNull(update, "update"));
1✔
855
      maybePublishConfig();
1✔
856
    }
1✔
857

858
    @Override
859
    public void onResourceDoesNotExist(String resourceName) {
860
      handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
1✔
861
      maybePublishConfig();
1✔
862
    }
1✔
863

864
    void addParentContext(CdsWatcher parentContext) {
865
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
866
    }
1✔
867
  }
868
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc