• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19691

14 Feb 2025 06:23PM CUT coverage: 88.603% (-0.02%) from 88.626%
#19691

push

github

web-flow
xds:Cleanup to reduce test flakiness (#11895)

* don't process resourceDoesNotExist for watchers that have been cancelled.

* Change test to use an ArgumentMatcher instead of expecting that only the final result will be sent since depending on timing there may be configs sent for clusters being removed with their entries as errors.

34261 of 38668 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.98
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.Sets;
27
import io.grpc.InternalLogId;
28
import io.grpc.Status;
29
import io.grpc.StatusOr;
30
import io.grpc.SynchronizationContext;
31
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
32
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
33
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
34
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
35
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
36
import io.grpc.xds.client.XdsClient;
37
import io.grpc.xds.client.XdsClient.ResourceWatcher;
38
import io.grpc.xds.client.XdsLogger;
39
import io.grpc.xds.client.XdsResourceType;
40
import java.io.Closeable;
41
import java.io.IOException;
42
import java.util.ArrayList;
43
import java.util.Collections;
44
import java.util.HashMap;
45
import java.util.HashSet;
46
import java.util.List;
47
import java.util.Map;
48
import java.util.Objects;
49
import java.util.Set;
50
import java.util.stream.Collectors;
51
import javax.annotation.Nullable;
52

53
/**
54
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
55
 * maintains the watchers for the xds resources and when an update is received, it either requests
56
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
57
 * applies to a single data plane authority.
58
 */
59
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
60
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
61
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
62
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
63
  private final XdsClient xdsClient;
64
  private final XdsConfigWatcher xdsConfigWatcher;
65
  private final SynchronizationContext syncContext;
66
  private final String dataPlaneAuthority;
67

68
  private final InternalLogId logId;
69
  private final XdsLogger logger;
70
  private XdsConfig lastXdsConfig = null;
1✔
71
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
72

73
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
74
                       SynchronizationContext syncContext, String dataPlaneAuthority,
75
                       String listenerName) {
1✔
76
    logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
1✔
77
    logger = XdsLogger.withLogId(logId);
1✔
78
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
79
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
80
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
81
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
82

83
    // start the ball rolling
84
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
85
  }
1✔
86

87
  public static String  toContextStr(String typeName, String resourceName) {
88
    return typeName + " resource: " + resourceName;
1✔
89
  }
90

91
  @Override
92
  public Closeable subscribeToCluster(String clusterName) {
93

94
    checkNotNull(clusterName, "clusterName");
1✔
95
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
96

97
    syncContext.execute(() -> {
1✔
98
      addClusterWatcher(clusterName, subscription, 1);
1✔
99
      maybePublishConfig();
1✔
100
    });
1✔
101

102
    return subscription;
1✔
103
  }
104

105
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
106
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
107
    XdsResourceType<T> type = watcher.type;
1✔
108
    String resourceName = watcher.resourceName;
1✔
109

110
    @SuppressWarnings("unchecked")
111
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
112
    if (typeWatchers == null) {
1✔
113
      typeWatchers = new TypeWatchers<>(type);
1✔
114
      resourceWatchers.put(type, typeWatchers);
1✔
115
    }
116

117
    typeWatchers.add(resourceName, watcher);
1✔
118
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
119
  }
1✔
120

121
  private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) {
122
    if (watcher == null) {
1✔
123
      return;
×
124
    }
125
    watcher.parentContexts.remove(parentContext);
1✔
126
    if (watcher.parentContexts.isEmpty()) {
1✔
127
      cancelWatcher(watcher);
1✔
128
    }
129
  }
1✔
130

131
  private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) {
132
    if (watcher == null) {
1✔
133
      return;
×
134
    }
135
    watcher.parentContexts.remove(parentContext);
1✔
136
    if (watcher.parentContexts.isEmpty()) {
1✔
137
      cancelWatcher(watcher);
1✔
138
    }
139
  }
1✔
140

141
  private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
142
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
143

144
    if (watcher == null) {
1✔
145
      return;
×
146
    }
147

148
    if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) {
1✔
149
      throwIfParentContextsNotEmpty(watcher);
1✔
150
    }
151

152
    watcher.cancelled = true;
1✔
153
    XdsResourceType<T> type = watcher.type;
1✔
154
    String resourceName = watcher.resourceName;
1✔
155

156
    @SuppressWarnings("unchecked")
157
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
158
    if (typeWatchers == null) {
1✔
159
      logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher);
×
160
      return;
×
161
    }
162

163
    typeWatchers.watchers.remove(resourceName);
1✔
164
    xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);
1✔
165

166
  }
1✔
167

168
  private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
169
    if (watcher instanceof CdsWatcher) {
1✔
170
      CdsWatcher cdsWatcher = (CdsWatcher) watcher;
1✔
171
      if (!cdsWatcher.parentContexts.isEmpty()) {
1✔
172
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
173
            cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet());
×
174
        throw new IllegalStateException(msg);
×
175
      }
176
    } else if (watcher instanceof EdsWatcher) {
1✔
177
      EdsWatcher edsWatcher = (EdsWatcher) watcher;
1✔
178
      if (!edsWatcher.parentContexts.isEmpty()) {
1✔
179
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
180
            edsWatcher.resourceName(), edsWatcher.parentContexts);
×
181
        throw new IllegalStateException(msg);
×
182
      }
183
    }
184
  }
1✔
185

186
  public void shutdown() {
187
    syncContext.execute(() -> {
1✔
188
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
189
        shutdownWatchersForType(watchers);
1✔
190
      }
1✔
191
      resourceWatchers.clear();
1✔
192
    });
1✔
193
  }
1✔
194

195
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
196
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
197
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
198
          watcherEntry.getValue());
1✔
199
    }
1✔
200
  }
1✔
201

202
  private void releaseSubscription(ClusterSubscription subscription) {
203
    checkNotNull(subscription, "subscription");
1✔
204
    String clusterName = subscription.getClusterName();
1✔
205
    syncContext.execute(() -> {
1✔
206
      XdsWatcherBase<?> cdsWatcher =
1✔
207
          resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
208
      if (cdsWatcher == null) {
1✔
209
        return; // already released while waiting for the syncContext
×
210
      }
211
      cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
1✔
212
      maybePublishConfig();
1✔
213
    });
1✔
214
  }
1✔
215

216
  private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) {
217
    checkNotNull(root, "root");
1✔
218

219
    cancelCdsWatcher(root, parentContext);
1✔
220

221
    if (!root.hasDataValue() || !root.parentContexts.isEmpty()) {
1✔
222
      return;
1✔
223
    }
224

225
    XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue();
1✔
226
    switch (cdsUpdate.clusterType()) {
1✔
227
      case EDS:
228
        String edsServiceName = cdsUpdate.edsServiceName();
1✔
229
        EdsWatcher edsWatcher =
1✔
230
            (EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName);
1✔
231
        cancelEdsWatcher(edsWatcher, root);
1✔
232
        break;
1✔
233
      case AGGREGATE:
234
        for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
235
          CdsWatcher clusterWatcher =
1✔
236
              (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(cluster);
1✔
237
          if (clusterWatcher != null) {
1✔
238
            cancelClusterWatcherTree(clusterWatcher, root);
1✔
239
          }
240
        }
1✔
241
        break;
1✔
242
      case LOGICAL_DNS:
243
        // no eds needed
244
        break;
×
245
      default:
246
        throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType());
×
247
    }
248
  }
1✔
249

250
  /**
251
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
252
   * the watchers.
253
   */
254
  private void maybePublishConfig() {
255
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
256
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
257
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
258
        .anyMatch(XdsWatcherBase::missingResult);
1✔
259
    if (waitingOnResource) {
1✔
260
      return;
1✔
261
    }
262

263
    XdsConfig newConfig = buildConfig();
1✔
264
    if (Objects.equals(newConfig, lastXdsConfig)) {
1✔
265
      return;
1✔
266
    }
267
    lastXdsConfig = newConfig;
1✔
268
    xdsConfigWatcher.onUpdate(lastXdsConfig);
1✔
269
  }
1✔
270

271
  @VisibleForTesting
272
  XdsConfig buildConfig() {
273
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
274

275
    // Iterate watchers and build the XdsConfig
276

277
    // Will only be 1 listener and 1 route resource
278
    VirtualHost activeVirtualHost = getActiveVirtualHost();
1✔
279
    for (XdsWatcherBase<?> xdsWatcherBase :
280
        resourceWatchers.get(XdsListenerResource.getInstance()).watchers.values()) {
1✔
281
      XdsListenerResource.LdsUpdate ldsUpdate = ((LdsWatcher) xdsWatcherBase).getData().getValue();
1✔
282
      builder.setListener(ldsUpdate);
1✔
283
      if (activeVirtualHost == null) {
1✔
284
        activeVirtualHost = RoutingUtils.findVirtualHostForHostName(
1✔
285
            ldsUpdate.httpConnectionManager().virtualHosts(), dataPlaneAuthority);
1✔
286
      }
287

288
      if (ldsUpdate.httpConnectionManager() != null
1✔
289
          && ldsUpdate.httpConnectionManager().virtualHosts() != null) {
1✔
290
        RdsUpdate rdsUpdate = new RdsUpdate(ldsUpdate.httpConnectionManager().virtualHosts());
1✔
291
        builder.setRoute(rdsUpdate);
1✔
292
      }
293
    }
1✔
294

295
    resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
1✔
296
        .map(watcher -> (RdsWatcher) watcher)
1✔
297
        .forEach(watcher -> builder.setRoute(watcher.getData().getValue()));
1✔
298

299
    builder.setVirtualHost(activeVirtualHost);
1✔
300

301
    Map<String, ? extends XdsWatcherBase<?>> edsWatchers =
1✔
302
        resourceWatchers.get(ENDPOINT_RESOURCE).watchers;
1✔
303
    Map<String, ? extends XdsWatcherBase<?>> cdsWatchers =
1✔
304
        resourceWatchers.get(CLUSTER_RESOURCE).watchers;
1✔
305

306
    // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
307
    List<String> topLevelClusters =
1✔
308
        cdsWatchers.values().stream()
1✔
309
            .filter(XdsDependencyManager::isTopLevelCluster)
1✔
310
            .map(w -> w.resourceName())
1✔
311
            .collect(Collectors.toList());
1✔
312

313
    // Flatten multi-level aggregates into lists of leaf clusters
314
    Set<String> leafNames =
1✔
315
        addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters);
1✔
316

317
    addLeavesToBuilder(builder, edsWatchers, leafNames);
1✔
318

319
    return builder.build();
1✔
320
  }
321

322
  private void addLeavesToBuilder(XdsConfig.XdsConfigBuilder builder,
323
                                  Map<String, ? extends XdsWatcherBase<?>> edsWatchers,
324
                                  Set<String> leafNames) {
325
    for (String clusterName : leafNames) {
1✔
326
      CdsWatcher cdsWatcher = getCluster(clusterName);
1✔
327
      StatusOr<XdsClusterResource.CdsUpdate> cdsUpdateOr = cdsWatcher.getData();
1✔
328

329
      if (cdsUpdateOr.hasValue()) {
1✔
330
        XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue();
1✔
331
        if (cdsUpdate.clusterType() == ClusterType.EDS) {
1✔
332
          EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName());
1✔
333
          if (edsWatcher != null) {
1✔
334
            EndpointConfig child = new EndpointConfig(edsWatcher.getData());
1✔
335
            builder.addCluster(clusterName, StatusOr.fromValue(
1✔
336
                new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
337
          } else {
1✔
338
            builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
339
                "EDS resource not found for cluster " + clusterName)));
340
          }
341
        } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) {
1✔
342
          // TODO get the resolved endpoint configuration
343
          builder.addCluster(clusterName, StatusOr.fromValue(
×
344
              new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, new EndpointConfig(null))));
345
        }
346
      } else {
1✔
347
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus()));
1✔
348
      }
349
    }
1✔
350
  }
1✔
351

352
  // Adds the top-level clusters to the builder and returns the leaf cluster names
353
  private Set<String> addTopLevelClustersToBuilder(
354
      XdsConfig.XdsConfigBuilder builder, Map<String, ? extends XdsWatcherBase<?>> edsWatchers,
355
      Map<String, ? extends XdsWatcherBase<?>> cdsWatchers, List<String> topLevelClusters) {
356

357
    Set<String> leafClusterNames = new HashSet<>();
1✔
358
    for (String clusterName : topLevelClusters) {
1✔
359
      CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName);
1✔
360
      StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
361
      if (!cdsWatcher.hasDataValue()) {
1✔
362
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
363
        continue;
1✔
364
      }
365

366
      XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
367
      XdsConfig.XdsClusterConfig.ClusterChild child;
368
      switch (cdsUpdate.clusterType()) {
1✔
369
        case AGGREGATE:
370
          List<String> leafNames = getLeafNames(cdsUpdate);
1✔
371
          child = new AggregateConfig(leafNames);
1✔
372
          leafClusterNames.addAll(leafNames);
1✔
373
          break;
1✔
374
        case EDS:
375
          EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName());
1✔
376
          if (edsWatcher != null) {
1✔
377
            child = new EndpointConfig(edsWatcher.getData());
1✔
378
          } else {
379
            builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
380
                "EDS resource not found for cluster " + clusterName)));
381
            continue;
×
382
          }
383
          break;
384
        case LOGICAL_DNS:
385
          // TODO get the resolved endpoint configuration
386
          child = new EndpointConfig(null);
×
387
          break;
×
388
        default:
389
          throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
390
      }
391
      builder.addCluster(clusterName, StatusOr.fromValue(
1✔
392
          new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
393
    }
1✔
394

395
    return leafClusterNames;
1✔
396
  }
397

398
  private List<String> getLeafNames(XdsClusterResource.CdsUpdate cdsUpdate) {
399
    List<String> childNames = new ArrayList<>();
1✔
400

401
    for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
402
      StatusOr<XdsClusterResource.CdsUpdate> data = getCluster(cluster).getData();
1✔
403
      if (data == null || !data.hasValue() || data.getValue() == null) {
1✔
404
        childNames.add(cluster);
1✔
405
        continue;
1✔
406
      }
407
      if (data.getValue().clusterType() == ClusterType.AGGREGATE) {
1✔
408
        childNames.addAll(getLeafNames(data.getValue()));
1✔
409
      } else {
410
        childNames.add(cluster);
1✔
411
      }
412
    }
1✔
413

414
    return childNames;
1✔
415
  }
416

417
  private static boolean isTopLevelCluster(XdsWatcherBase<?> cdsWatcher) {
418
    if (! (cdsWatcher instanceof CdsWatcher)) {
1✔
419
      return false;
×
420
    }
421
    return ((CdsWatcher)cdsWatcher).parentContexts.values().stream()
1✔
422
        .anyMatch(depth -> depth == 1);
1✔
423
  }
424

425
  @Override
426
  public String toString() {
427
    return logId.toString();
×
428
  }
429

430
  // Returns true if the watcher was added, false if it already exists
431
  private boolean addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
432
    TypeWatchers<?> typeWatchers = resourceWatchers.get(XdsEndpointResource.getInstance());
1✔
433
    if (typeWatchers == null || !typeWatchers.watchers.containsKey(edsServiceName)) {
1✔
434
      addWatcher(new EdsWatcher(edsServiceName, parentContext));
1✔
435
      return true;
1✔
436
    }
437

438
    EdsWatcher watcher = (EdsWatcher) typeWatchers.watchers.get(edsServiceName);
1✔
439
    watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence
1✔
440
    return false;
1✔
441
  }
442

443
  private void addClusterWatcher(String clusterName, Object parentContext, int depth) {
444
    TypeWatchers<?> clusterWatchers = resourceWatchers.get(CLUSTER_RESOURCE);
1✔
445
    if (clusterWatchers != null) {
1✔
446
      CdsWatcher watcher = (CdsWatcher) clusterWatchers.watchers.get(clusterName);
1✔
447
      if (watcher != null) {
1✔
448
        watcher.parentContexts.put(parentContext, depth);
1✔
449
        return;
1✔
450
      }
451
    }
452

453
    addWatcher(new CdsWatcher(clusterName, parentContext, depth));
1✔
454
  }
1✔
455

456
  private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
457
                            VirtualHost oldVirtualHost, boolean sameParentContext) {
458
    VirtualHost virtualHost =
1✔
459
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
460
    if (virtualHost == null) {
1✔
461
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
462
      logger.log(XdsLogger.XdsLogLevel.WARNING, error);
1✔
463
      cleanUpRoutes();
1✔
464
      xdsConfigWatcher.onError(
1✔
465
          "xDS node ID:" + dataPlaneAuthority, Status.UNAVAILABLE.withDescription(error));
1✔
466
      return;
1✔
467
    }
468

469
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
470
    Set<String> oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost);
1✔
471

472
    if (sameParentContext) {
1✔
473
      // Calculate diffs.
474
      Set<String> addedClusters = Sets.difference(newClusters, oldClusters);
1✔
475
      Set<String> deletedClusters = Sets.difference(oldClusters, newClusters);
1✔
476

477
      deletedClusters.forEach(watcher ->
1✔
478
          cancelClusterWatcherTree(getCluster(watcher), newParentContext));
1✔
479
      addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
480
    } else {
1✔
481
      newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
482
    }
483
  }
1✔
484

485
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
486
    if (virtualHost == null) {
1✔
487
      return Collections.emptySet();
1✔
488
    }
489

490
    // Get all cluster names to which requests can be routed through the virtual host.
491
    Set<String> clusters = new HashSet<>();
1✔
492
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
493
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
494
      if (action == null) {
1✔
495
        continue;
×
496
      }
497
      if (action.cluster() != null) {
1✔
498
        clusters.add(action.cluster());
1✔
499
      } else if (action.weightedClusters() != null) {
×
500
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
×
501
          clusters.add(weighedCluster.name());
×
502
        }
×
503
      }
504
    }
1✔
505

506
    return clusters;
1✔
507
  }
508

509
  @Nullable
510
  private VirtualHost getActiveVirtualHost() {
511
    TypeWatchers<?> rdsWatchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
1✔
512
    if (rdsWatchers == null) {
1✔
513
      return null;
1✔
514
    }
515

516
    RdsWatcher activeRdsWatcher =
1✔
517
        (RdsWatcher) rdsWatchers.watchers.values().stream().findFirst().orElse(null);
1✔
518
    if (activeRdsWatcher == null || activeRdsWatcher.missingResult()
1✔
519
        || !activeRdsWatcher.getData().hasValue()) {
1✔
520
      return null;
1✔
521
    }
522

523
    return RoutingUtils.findVirtualHostForHostName(
1✔
524
        activeRdsWatcher.getData().getValue().virtualHosts, dataPlaneAuthority);
1✔
525
  }
526

527
  // Must be in SyncContext
528
  private void cleanUpRoutes() {
529
    // Remove RdsWatcher & CDS Watchers
530
    TypeWatchers<?> rdsResourceWatcher =
1✔
531
        resourceWatchers.get(XdsRouteConfigureResource.getInstance());
1✔
532
    if (rdsResourceWatcher == null || rdsResourceWatcher.watchers.isEmpty()) {
1✔
533
      return;
1✔
534
    }
535

536
    XdsWatcherBase<?> watcher = rdsResourceWatcher.watchers.values().stream().findFirst().get();
×
537
    cancelWatcher(watcher);
×
538

539
    // Remove CdsWatchers pointed to by the RdsWatcher
540
    RdsWatcher rdsWatcher = (RdsWatcher) watcher;
×
541
    for (String cName : rdsWatcher.getCdsNames()) {
×
542
      CdsWatcher cdsWatcher = getCluster(cName);
×
543
      if (cdsWatcher != null) {
×
544
        cancelClusterWatcherTree(cdsWatcher, rdsWatcher);
×
545
      }
546
    }
×
547
  }
×
548

549
  private CdsWatcher getCluster(String clusterName) {
550
    return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
551
  }
552

553
  private static class TypeWatchers<T extends ResourceUpdate> {
554
    // Key is resource name
555
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
556
    final XdsResourceType<T> resourceType;
557

558
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
559
      this.resourceType = resourceType;
1✔
560
    }
1✔
561

562
    public void add(String resourceName, XdsWatcherBase<T> watcher) {
563
      watchers.put(resourceName, watcher);
1✔
564
    }
1✔
565
  }
566

567
  public interface XdsConfigWatcher {
568

569
    void onUpdate(XdsConfig config);
570

571
    // These 2 methods are invoked when there is an error or
572
    // does-not-exist on LDS or RDS only.  The context will be a
573
    // human-readable string indicating the scope in which the error
574
    // occurred (e.g., the resource type and name).
575
    void onError(String resourceContext, Status status);
576

577
    void onResourceDoesNotExist(String resourceContext);
578
  }
579

580
  private class ClusterSubscription implements Closeable {
581
    String clusterName;
582

583
    public ClusterSubscription(String clusterName) {
1✔
584
      this.clusterName = clusterName;
1✔
585
    }
1✔
586

587
    public String getClusterName() {
588
      return clusterName;
1✔
589
    }
590

591
    @Override
592
    public void close() throws IOException {
593
      releaseSubscription(this);
1✔
594
    }
1✔
595
  }
596

597
  private abstract static class XdsWatcherBase<T extends ResourceUpdate>
598
      implements ResourceWatcher<T> {
599
    private final XdsResourceType<T> type;
600
    private final String resourceName;
601
    boolean cancelled;
602

603
    @Nullable
604
    private StatusOr<T> data;
605

606

607
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
608
      this.type = checkNotNull(type, "type");
1✔
609
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
610
    }
1✔
611

612
    @Override
613
    public void onError(Status error) {
614
      checkNotNull(error, "error");
1✔
615
      setDataAsStatus(error);
1✔
616
    }
1✔
617

618
    protected void handleDoesNotExist(String resourceName) {
619
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
620
      setDataAsStatus(Status.UNAVAILABLE.withDescription("No " + toContextString()));
1✔
621
    }
1✔
622

623
    boolean missingResult() {
624
      return data == null;
1✔
625
    }
626

627
    @Nullable
628
    StatusOr<T> getData() {
629
      return data;
1✔
630
    }
631

632
    boolean hasDataValue() {
633
      return data != null && data.hasValue();
1✔
634
    }
635

636
    String resourceName() {
637
      return resourceName;
1✔
638
    }
639

640
    protected void setData(T data) {
641
      checkNotNull(data, "data");
1✔
642
      this.data = StatusOr.fromValue(data);
1✔
643
    }
1✔
644

645
    protected void setDataAsStatus(Status status) {
646
      checkNotNull(status, "status");
1✔
647
      this.data = StatusOr.fromStatus(status);
1✔
648
    }
1✔
649

650
    String toContextString() {
651
      return toContextStr(type.typeName(), resourceName);
1✔
652
    }
653
  }
654

655
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate> {
656
    String rdsName;
657

658
    private LdsWatcher(String resourceName) {
1✔
659
      super(XdsListenerResource.getInstance(), resourceName);
1✔
660
    }
1✔
661

662
    @Override
663
    public void onChanged(XdsListenerResource.LdsUpdate update) {
664
      checkNotNull(update, "update");
1✔
665

666
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
667
      List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
1✔
668
      String rdsName = httpConnectionManager.rdsName();
1✔
669
      VirtualHost activeVirtualHost = getActiveVirtualHost();
1✔
670

671
      boolean changedRdsName = !Objects.equals(rdsName, this.rdsName);
1✔
672
      if (changedRdsName) {
1✔
673
        cleanUpRdsWatcher();
1✔
674
      }
675

676
      if (virtualHosts != null) {
1✔
677
        // No RDS watcher since we are getting RDS updates via LDS
678
        updateRoutes(virtualHosts, this, activeVirtualHost, this.rdsName == null);
1✔
679
        this.rdsName = null;
1✔
680
      } else if (changedRdsName) {
1✔
681
        cleanUpRdsWatcher();
1✔
682
        this.rdsName = rdsName;
1✔
683
        addWatcher(new RdsWatcher(rdsName));
1✔
684
        logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
685
      }
686

687
      setData(update);
1✔
688
      maybePublishConfig();
1✔
689
    }
1✔
690

691
    @Override
692
    public void onError(Status error) {
693
      super.onError(checkNotNull(error, "error"));
1✔
694
      xdsConfigWatcher.onError(toContextString(), error);
1✔
695
    }
1✔
696

697
    @Override
698
    public void onResourceDoesNotExist(String resourceName) {
699
      if (cancelled) {
1✔
700
        return;
×
701
      }
702

703
      handleDoesNotExist(resourceName);
1✔
704
      xdsConfigWatcher.onResourceDoesNotExist(toContextString());
1✔
705
    }
1✔
706

707
    private void cleanUpRdsWatcher() {
708
      RdsWatcher oldRdsWatcher = getRdsWatcher();
1✔
709
      if (oldRdsWatcher != null) {
1✔
710
        cancelWatcher(oldRdsWatcher);
1✔
711
        logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName);
1✔
712

713
        // Cleanup clusters (as appropriate) that had the old rds watcher as a parent
714
        if (!oldRdsWatcher.hasDataValue() || !oldRdsWatcher.getData().hasValue()
1✔
715
            || resourceWatchers.get(CLUSTER_RESOURCE) == null) {
1✔
716
          return;
×
717
        }
718
        for (XdsWatcherBase<?> watcher :
719
            resourceWatchers.get(CLUSTER_RESOURCE).watchers.values()) {
1✔
720
          cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher);
1✔
721
        }
1✔
722
      }
723
    }
1✔
724

725
    private RdsWatcher getRdsWatcher() {
726
      TypeWatchers<?> watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
1✔
727
      if (watchers == null || rdsName == null || watchers.watchers.isEmpty()) {
1✔
728
        return null;
1✔
729
      }
730

731
      return (RdsWatcher) watchers.watchers.get(rdsName);
1✔
732
    }
733
  }
734

735
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> {
736

737
    public RdsWatcher(String resourceName) {
1✔
738
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
739
    }
1✔
740

741
    @Override
742
    public void onChanged(RdsUpdate update) {
743
      checkNotNull(update, "update");
1✔
744
      RdsUpdate oldData = hasDataValue() ? getData().getValue() : null;
1✔
745
      VirtualHost oldVirtualHost =
746
          (oldData != null)
1✔
747
          ? RoutingUtils.findVirtualHostForHostName(oldData.virtualHosts, dataPlaneAuthority)
1✔
748
          : null;
1✔
749
      setData(update);
1✔
750
      updateRoutes(update.virtualHosts, this, oldVirtualHost, true);
1✔
751
      maybePublishConfig();
1✔
752
    }
1✔
753

754
    @Override
755
    public void onError(Status error) {
756
      super.onError(checkNotNull(error, "error"));
×
757
      xdsConfigWatcher.onError(toContextString(), error);
×
758
    }
×
759

760
    @Override
761
    public void onResourceDoesNotExist(String resourceName) {
762
      if (cancelled) {
1✔
763
        return;
×
764
      }
765
      handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
1✔
766
      xdsConfigWatcher.onResourceDoesNotExist(toContextString());
1✔
767
    }
1✔
768

769
    ImmutableList<String> getCdsNames() {
770
      if (!hasDataValue() || getData().getValue().virtualHosts == null) {
×
771
        return ImmutableList.of();
×
772
      }
773

774
      return ImmutableList.copyOf(getClusterNamesFromVirtualHost(getActiveVirtualHost()));
×
775
    }
776
  }
777

778
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
779
    Map<Object, Integer> parentContexts = new HashMap<>();
1✔
780

781
    CdsWatcher(String resourceName, Object parentContext, int depth) {
1✔
782
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
783
      this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth);
1✔
784
    }
1✔
785

786
    @Override
787
    public void onChanged(XdsClusterResource.CdsUpdate update) {
788
      checkNotNull(update, "update");
1✔
789
      switch (update.clusterType()) {
1✔
790
        case EDS:
791
          setData(update);
1✔
792
          if (!addEdsWatcher(update.edsServiceName(), this))  {
1✔
793
            maybePublishConfig();
1✔
794
          }
795
          break;
796
        case LOGICAL_DNS:
797
          setData(update);
×
798
          maybePublishConfig();
×
799
          // no eds needed
800
          break;
×
801
        case AGGREGATE:
802
          Object parentContext = this;
1✔
803
          int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1;
1✔
804
          if (depth > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
805
            logger.log(XdsLogger.XdsLogLevel.WARNING,
×
806
                "Cluster recursion depth limit exceeded for cluster {0}", resourceName());
×
807
            Status error = Status.UNAVAILABLE.withDescription(
×
808
                "aggregate cluster graph exceeds max depth");
809
            setDataAsStatus(error);
×
810
          }
811
          if (hasDataValue()) {
1✔
812
            Set<String> oldNames = new HashSet<>(getData().getValue().prioritizedClusterNames());
1✔
813
            Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
1✔
814

815

816
            Set<String> deletedClusters = Sets.difference(oldNames, newNames);
1✔
817
            deletedClusters.forEach((cluster)
1✔
818
                -> cancelClusterWatcherTree(getCluster(cluster), parentContext));
1✔
819

820
            if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
821
              setData(update);
1✔
822
              Set<String> addedClusters = Sets.difference(newNames, oldNames);
1✔
823
              addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth));
1✔
824

825
              if (addedClusters.isEmpty()) {
1✔
826
                maybePublishConfig();
×
827
              }
828
            } else { // data was set to error status above
1✔
829
              maybePublishConfig();
×
830
            }
831

832
          } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
833
            setData(update);
1✔
834
            update.prioritizedClusterNames()
1✔
835
                .forEach(name -> addClusterWatcher(name, parentContext, depth));
1✔
836
            maybePublishConfig();
1✔
837
          }
838
          break;
839
        default:
840
          Status error = Status.UNAVAILABLE.withDescription(
×
841
              "aggregate cluster graph exceeds max depth");
842
          setDataAsStatus(error);
×
843
          maybePublishConfig();
×
844
      }
845
    }
1✔
846

847
    @Override
848
    public void onResourceDoesNotExist(String resourceName) {
849
      if (cancelled) {
1✔
850
        return;
1✔
851
      }
852
      handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
1✔
853
      maybePublishConfig();
1✔
854
    }
1✔
855
  }
856

857
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
858
    private final Set<CdsWatcher> parentContexts = new HashSet<>();
1✔
859

860
    private EdsWatcher(String resourceName, CdsWatcher parentContext) {
1✔
861
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
862
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
863
    }
1✔
864

865
    @Override
866
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
867
      setData(checkNotNull(update, "update"));
1✔
868
      maybePublishConfig();
1✔
869
    }
1✔
870

871
    @Override
872
    public void onResourceDoesNotExist(String resourceName) {
873
      if (cancelled) {
1✔
874
        return;
×
875
      }
876
      handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
1✔
877
      maybePublishConfig();
1✔
878
    }
1✔
879

880
    void addParentContext(CdsWatcher parentContext) {
881
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
882
    }
1✔
883
  }
884
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc