• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19820

22 May 2025 10:41PM UTC coverage: 88.611% (+0.02%) from 88.595%
#19820

push

github

web-flow
Rename PSM interop fallback test suite to light (#12093)

34632 of 39083 relevant lines covered (88.61%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.77
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.Sets;
26
import io.grpc.InternalLogId;
27
import io.grpc.NameResolver;
28
import io.grpc.Status;
29
import io.grpc.StatusOr;
30
import io.grpc.SynchronizationContext;
31
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
32
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
33
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
34
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
35
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
36
import io.grpc.xds.client.XdsClient;
37
import io.grpc.xds.client.XdsClient.ResourceWatcher;
38
import io.grpc.xds.client.XdsLogger;
39
import io.grpc.xds.client.XdsResourceType;
40
import java.io.Closeable;
41
import java.io.IOException;
42
import java.util.Collections;
43
import java.util.HashMap;
44
import java.util.HashSet;
45
import java.util.List;
46
import java.util.Map;
47
import java.util.Objects;
48
import java.util.Set;
49
import java.util.concurrent.ScheduledExecutorService;
50
import java.util.stream.Collectors;
51
import javax.annotation.Nullable;
52

53
/**
54
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
55
 * maintains the watchers for the xds resources and when an update is received, it either requests
56
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
57
 * applies to a single data plane authority.
58
 */
59
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
60
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
61
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
62
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
63
  private final XdsClient xdsClient;
64
  private final XdsConfigWatcher xdsConfigWatcher;
65
  private final SynchronizationContext syncContext;
66
  private final String dataPlaneAuthority;
67

68
  private final InternalLogId logId;
69
  private final XdsLogger logger;
70
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
71
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
72

73
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
74
                       SynchronizationContext syncContext, String dataPlaneAuthority,
75
                       String listenerName, NameResolver.Args nameResolverArgs,
76
                       ScheduledExecutorService scheduler) {
1✔
77
    logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
1✔
78
    logger = XdsLogger.withLogId(logId);
1✔
79
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
80
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
81
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
82
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
83
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
84
    checkNotNull(scheduler, "scheduler");
1✔
85

86
    // start the ball rolling
87
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
88
  }
1✔
89

90
  public static String toContextStr(String typeName, String resourceName) {
91
    return typeName + " resource " + resourceName;
1✔
92
  }
93

94
  @Override
95
  public Closeable subscribeToCluster(String clusterName) {
96

97
    checkNotNull(clusterName, "clusterName");
1✔
98
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
99

100
    syncContext.execute(() -> {
1✔
101
      addClusterWatcher(clusterName, subscription, 1);
1✔
102
      maybePublishConfig();
1✔
103
    });
1✔
104

105
    return subscription;
1✔
106
  }
107

108
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
109
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
110
    XdsResourceType<T> type = watcher.type;
1✔
111
    String resourceName = watcher.resourceName;
1✔
112

113
    @SuppressWarnings("unchecked")
114
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
115
    if (typeWatchers == null) {
1✔
116
      typeWatchers = new TypeWatchers<>(type);
1✔
117
      resourceWatchers.put(type, typeWatchers);
1✔
118
    }
119

120
    typeWatchers.add(resourceName, watcher);
1✔
121
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
122
  }
1✔
123

124
  private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) {
125
    if (watcher == null) {
1✔
126
      return;
×
127
    }
128
    watcher.parentContexts.remove(parentContext);
1✔
129
    if (watcher.parentContexts.isEmpty()) {
1✔
130
      cancelWatcher(watcher);
1✔
131
    }
132
  }
1✔
133

134
  private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) {
135
    if (watcher == null) {
1✔
136
      return;
×
137
    }
138
    watcher.parentContexts.remove(parentContext);
1✔
139
    if (watcher.parentContexts.isEmpty()) {
1✔
140
      cancelWatcher(watcher);
1✔
141
    }
142
  }
1✔
143

144
  private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
145
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
146

147
    if (watcher == null) {
1✔
148
      return;
×
149
    }
150

151
    if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) {
1✔
152
      throwIfParentContextsNotEmpty(watcher);
1✔
153
    }
154

155
    watcher.cancelled = true;
1✔
156
    XdsResourceType<T> type = watcher.type;
1✔
157
    String resourceName = watcher.resourceName;
1✔
158

159
    @SuppressWarnings("unchecked")
160
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
161
    if (typeWatchers == null) {
1✔
162
      logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher);
×
163
      return;
×
164
    }
165

166
    typeWatchers.watchers.remove(resourceName);
1✔
167
    xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);
1✔
168

169
  }
1✔
170

171
  private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
172
    if (watcher instanceof CdsWatcher) {
1✔
173
      CdsWatcher cdsWatcher = (CdsWatcher) watcher;
1✔
174
      if (!cdsWatcher.parentContexts.isEmpty()) {
1✔
175
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
176
            cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet());
×
177
        throw new IllegalStateException(msg);
×
178
      }
179
    } else if (watcher instanceof EdsWatcher) {
1✔
180
      EdsWatcher edsWatcher = (EdsWatcher) watcher;
1✔
181
      if (!edsWatcher.parentContexts.isEmpty()) {
1✔
182
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
183
            edsWatcher.resourceName(), edsWatcher.parentContexts);
×
184
        throw new IllegalStateException(msg);
×
185
      }
186
    }
187
  }
1✔
188

189
  public void shutdown() {
190
    syncContext.execute(() -> {
1✔
191
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
192
        shutdownWatchersForType(watchers);
1✔
193
      }
1✔
194
      resourceWatchers.clear();
1✔
195
    });
1✔
196
  }
1✔
197

198
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
199
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
200
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
201
          watcherEntry.getValue());
1✔
202
      watcherEntry.getValue().cancelled = true;
1✔
203
    }
1✔
204
  }
1✔
205

206
  private void releaseSubscription(ClusterSubscription subscription) {
207
    checkNotNull(subscription, "subscription");
1✔
208
    String clusterName = subscription.getClusterName();
1✔
209
    syncContext.execute(() -> {
1✔
210
      XdsWatcherBase<?> cdsWatcher =
1✔
211
          resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
212
      if (cdsWatcher == null) {
1✔
213
        return; // already released while waiting for the syncContext
×
214
      }
215
      cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
1✔
216
      maybePublishConfig();
1✔
217
    });
1✔
218
  }
1✔
219

220
  private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) {
221
    checkNotNull(root, "root");
1✔
222

223
    cancelCdsWatcher(root, parentContext);
1✔
224

225
    if (!root.hasDataValue() || !root.parentContexts.isEmpty()) {
1✔
226
      return;
1✔
227
    }
228

229
    XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue();
1✔
230
    switch (cdsUpdate.clusterType()) {
1✔
231
      case EDS:
232
        String edsServiceName = root.getEdsServiceName();
1✔
233
        EdsWatcher edsWatcher =
1✔
234
            (EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName);
1✔
235
        cancelEdsWatcher(edsWatcher, root);
1✔
236
        break;
1✔
237
      case AGGREGATE:
238
        for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
239
          CdsWatcher clusterWatcher =
1✔
240
              (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(cluster);
1✔
241
          if (clusterWatcher != null) {
1✔
242
            cancelClusterWatcherTree(clusterWatcher, root);
1✔
243
          }
244
        }
1✔
245
        break;
1✔
246
      case LOGICAL_DNS:
247
        // no eds needed, so everything happens in cancelCdsWatcher()
248
        break;
×
249
      default:
250
        throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType());
×
251
    }
252
  }
1✔
253

254
  /**
255
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
256
   * the watchers.
257
   */
258
  private void maybePublishConfig() {
259
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
260
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
261
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
262
        .anyMatch(XdsWatcherBase::missingResult);
1✔
263
    if (waitingOnResource) {
1✔
264
      return;
1✔
265
    }
266

267
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
268
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
269
      return;
1✔
270
    }
271
    assert newUpdate.hasValue()
1✔
272
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
273
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
274
    lastUpdate = newUpdate;
1✔
275
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
276
  }
1✔
277

278
  @VisibleForTesting
279
  StatusOr<XdsConfig> buildUpdate() {
280
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
281

282
    // Iterate watchers and build the XdsConfig
283

284
    // Will only be 1 listener and 1 route resource
285
    RdsUpdateSupplier routeSource = null;
1✔
286
    for (XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher :
287
        getWatchers(XdsListenerResource.getInstance()).values()) {
1✔
288
      if (!ldsWatcher.getData().hasValue()) {
1✔
289
        return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
290
      }
291
      XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
292
      builder.setListener(ldsUpdate);
1✔
293
      routeSource = ((LdsWatcher) ldsWatcher).getRouteSource();
1✔
294
    }
1✔
295

296
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
297
    if (!statusOrRdsUpdate.hasValue()) {
1✔
298
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
299
    }
300
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
301
    builder.setRoute(rdsUpdate);
1✔
302

303
    VirtualHost activeVirtualHost =
1✔
304
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
305
    if (activeVirtualHost == null) {
1✔
306
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
307
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
308
    }
309
    builder.setVirtualHost(activeVirtualHost);
1✔
310

311
    Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers =
1✔
312
        getWatchers(ENDPOINT_RESOURCE);
1✔
313
    Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers =
1✔
314
        getWatchers(CLUSTER_RESOURCE);
1✔
315

316
    // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
317
    List<String> topLevelClusters =
1✔
318
        cdsWatchers.values().stream()
1✔
319
            .filter(XdsDependencyManager::isTopLevelCluster)
1✔
320
            .map(XdsWatcherBase<?>::resourceName)
1✔
321
            .distinct()
1✔
322
            .collect(Collectors.toList());
1✔
323

324
    // Flatten multi-level aggregates into lists of leaf clusters
325
    Set<String> leafNames =
1✔
326
        addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters);
1✔
327

328
    addLeavesToBuilder(builder, edsWatchers, leafNames);
1✔
329

330
    return StatusOr.fromValue(builder.build());
1✔
331
  }
332

333
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
334
      XdsResourceType<T> resourceType) {
335
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
336
    if (typeWatchers == null) {
1✔
337
      return Collections.emptyMap();
1✔
338
    }
339
    assert typeWatchers.resourceType == resourceType;
1✔
340
    @SuppressWarnings("unchecked")
341
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
342
    return tTypeWatchers.watchers;
1✔
343
  }
344

345
  private void addLeavesToBuilder(
346
      XdsConfig.XdsConfigBuilder builder,
347
      Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
348
      Set<String> leafNames) {
349
    for (String clusterName : leafNames) {
1✔
350
      CdsWatcher cdsWatcher = getCluster(clusterName);
1✔
351
      StatusOr<XdsClusterResource.CdsUpdate> cdsUpdateOr = cdsWatcher.getData();
1✔
352

353
      if (!cdsUpdateOr.hasValue()) {
1✔
354
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus()));
1✔
355
        continue;
1✔
356
      }
357

358
      XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue();
1✔
359
      if (cdsUpdate.clusterType() == ClusterType.EDS) {
1✔
360
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
361
            edsWatchers.get(cdsWatcher.getEdsServiceName());
1✔
362
        EndpointConfig child;
363
        if (edsWatcher != null) {
1✔
364
          child = new EndpointConfig(edsWatcher.getData());
1✔
365
        } else {
366
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
367
              "EDS resource not found for cluster " + clusterName)));
368
        }
369
        builder.addCluster(clusterName, StatusOr.fromValue(
1✔
370
            new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
371
      } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) {
1✔
372
        builder.addCluster(clusterName, StatusOr.fromStatus(
×
373
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
×
374
      }
375
    }
1✔
376
  }
1✔
377

378
  // Adds the top-level clusters to the builder and returns the leaf cluster names
379
  private Set<String> addTopLevelClustersToBuilder(
380
      XdsConfig.XdsConfigBuilder builder,
381
      Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
382
      Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers,
383
      List<String> topLevelClusters) {
384

385
    Set<String> leafClusterNames = new HashSet<>();
1✔
386
    for (String clusterName : topLevelClusters) {
1✔
387
      CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName);
1✔
388
      StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
389
      if (!cdsWatcher.hasDataValue()) {
1✔
390
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
391
        continue;
1✔
392
      }
393

394
      XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
395
      XdsConfig.XdsClusterConfig.ClusterChild child;
396
      switch (cdsUpdate.clusterType()) {
1✔
397
        case AGGREGATE:
398
          Set<String> leafNames = new HashSet<>();
1✔
399
          addLeafNames(leafNames, cdsUpdate);
1✔
400
          child = new AggregateConfig(leafNames);
1✔
401
          leafClusterNames.addAll(leafNames);
1✔
402
          break;
1✔
403
        case EDS:
404
          XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
405
              edsWatchers.get(cdsWatcher.getEdsServiceName());
1✔
406
          if (edsWatcher != null) {
1✔
407
            child = new EndpointConfig(edsWatcher.getData());
1✔
408
          } else {
409
            child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
410
                "EDS resource not found for cluster " + clusterName)));
411
          }
412
          break;
×
413
        case LOGICAL_DNS:
414
          // TODO get the resolved endpoint configuration
415
          child = new EndpointConfig(StatusOr.fromStatus(
1✔
416
              Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
417
          break;
1✔
418
        default:
419
          throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
420
      }
421
      builder.addCluster(clusterName, StatusOr.fromValue(
1✔
422
          new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
423
    }
1✔
424

425
    return leafClusterNames;
1✔
426
  }
427

428
  private void addLeafNames(Set<String> leafNames, XdsClusterResource.CdsUpdate cdsUpdate) {
429
    for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
430
      if (leafNames.contains(cluster)) {
1✔
431
        continue;
1✔
432
      }
433
      StatusOr<XdsClusterResource.CdsUpdate> data = getCluster(cluster).getData();
1✔
434
      if (data == null || !data.hasValue() || data.getValue() == null) {
1✔
435
        leafNames.add(cluster);
1✔
436
        continue;
1✔
437
      }
438
      if (data.getValue().clusterType() == ClusterType.AGGREGATE) {
1✔
439
        addLeafNames(leafNames, data.getValue());
1✔
440
      } else {
441
        leafNames.add(cluster);
1✔
442
      }
443
    }
1✔
444
  }
1✔
445

446
  private static boolean isTopLevelCluster(
447
      XdsWatcherBase<XdsClusterResource.CdsUpdate> cdsWatcher) {
448
    return ((CdsWatcher)cdsWatcher).parentContexts.values().stream()
1✔
449
        .anyMatch(depth -> depth == 1);
1✔
450
  }
451

452
  @Override
453
  public String toString() {
454
    return logId.toString();
×
455
  }
456

457
  // Returns true if the watcher was added, false if it already exists
458
  private boolean addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
459
    TypeWatchers<?> typeWatchers = resourceWatchers.get(XdsEndpointResource.getInstance());
1✔
460
    if (typeWatchers == null || !typeWatchers.watchers.containsKey(edsServiceName)) {
1✔
461
      addWatcher(new EdsWatcher(edsServiceName, parentContext));
1✔
462
      return true;
1✔
463
    }
464

465
    EdsWatcher watcher = (EdsWatcher) typeWatchers.watchers.get(edsServiceName);
1✔
466
    watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence
1✔
467
    return false;
1✔
468
  }
469

470
  private void addClusterWatcher(String clusterName, Object parentContext, int depth) {
471
    TypeWatchers<?> clusterWatchers = resourceWatchers.get(CLUSTER_RESOURCE);
1✔
472
    if (clusterWatchers != null) {
1✔
473
      CdsWatcher watcher = (CdsWatcher) clusterWatchers.watchers.get(clusterName);
1✔
474
      if (watcher != null) {
1✔
475
        watcher.parentContexts.put(parentContext, depth);
1✔
476
        return;
1✔
477
      }
478
    }
479

480
    addWatcher(new CdsWatcher(clusterName, parentContext, depth));
1✔
481
  }
1✔
482

483
  private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
484
                            List<VirtualHost> oldVirtualHosts, boolean sameParentContext) {
485
    VirtualHost oldVirtualHost =
1✔
486
        RoutingUtils.findVirtualHostForHostName(oldVirtualHosts, dataPlaneAuthority);
1✔
487
    VirtualHost virtualHost =
1✔
488
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
489

490
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
491
    Set<String> oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost);
1✔
492

493
    if (sameParentContext) {
1✔
494
      // Calculate diffs.
495
      Set<String> addedClusters = Sets.difference(newClusters, oldClusters);
1✔
496
      Set<String> deletedClusters = Sets.difference(oldClusters, newClusters);
1✔
497

498
      deletedClusters.forEach(watcher ->
1✔
499
          cancelClusterWatcherTree(getCluster(watcher), newParentContext));
1✔
500
      addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
501
    } else {
1✔
502
      newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
503
    }
504
  }
1✔
505

506
  private String nodeInfo() {
507
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
508
  }
509

510
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
511
    if (virtualHost == null) {
1✔
512
      return Collections.emptySet();
1✔
513
    }
514

515
    // Get all cluster names to which requests can be routed through the virtual host.
516
    Set<String> clusters = new HashSet<>();
1✔
517
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
518
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
519
      if (action == null) {
1✔
520
        continue;
1✔
521
      }
522
      if (action.cluster() != null) {
1✔
523
        clusters.add(action.cluster());
1✔
524
      } else if (action.weightedClusters() != null) {
1✔
525
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
526
          clusters.add(weighedCluster.name());
1✔
527
        }
1✔
528
      }
529
    }
1✔
530

531
    return clusters;
1✔
532
  }
533

534
  private CdsWatcher getCluster(String clusterName) {
535
    return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
536
  }
537

538
  private static class TypeWatchers<T extends ResourceUpdate> {
539
    // Key is resource name
540
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
541
    final XdsResourceType<T> resourceType;
542

543
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
544
      this.resourceType = resourceType;
1✔
545
    }
1✔
546

547
    public void add(String resourceName, XdsWatcherBase<T> watcher) {
548
      watchers.put(resourceName, watcher);
1✔
549
    }
1✔
550
  }
551

552
  public interface XdsConfigWatcher {
553
    /**
554
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
555
     * INTERNAL.
556
     */
557
    void onUpdate(StatusOr<XdsConfig> config);
558
  }
559

560
  private class ClusterSubscription implements Closeable {
561
    String clusterName;
562

563
    public ClusterSubscription(String clusterName) {
1✔
564
      this.clusterName = clusterName;
1✔
565
    }
1✔
566

567
    public String getClusterName() {
568
      return clusterName;
1✔
569
    }
570

571
    @Override
572
    public void close() throws IOException {
573
      releaseSubscription(this);
1✔
574
    }
1✔
575
  }
576

577
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
578
      implements ResourceWatcher<T> {
579
    private final XdsResourceType<T> type;
580
    private final String resourceName;
581
    boolean cancelled;
582

583
    @Nullable
584
    private StatusOr<T> data;
585

586

587
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
588
      this.type = checkNotNull(type, "type");
1✔
589
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
590
    }
1✔
591

592
    @Override
593
    public void onError(Status error) {
594
      checkNotNull(error, "error");
1✔
595
      if (cancelled) {
1✔
596
        return;
×
597
      }
598
      // Don't update configuration on error, if we've already received configuration
599
      if (!hasDataValue()) {
1✔
600
        setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
601
            String.format("Error retrieving %s: %s: %s",
1✔
602
              toContextString(), error.getCode(), error.getDescription())));
1✔
603
        maybePublishConfig();
1✔
604
      }
605
    }
1✔
606

607
    @Override
608
    public void onResourceDoesNotExist(String resourceName) {
609
      if (cancelled) {
1✔
610
        return;
1✔
611
      }
612

613
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
614
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
615
          toContextString() + " does not exist" + nodeInfo()));
1✔
616
      maybePublishConfig();
1✔
617
    }
1✔
618

619
    boolean missingResult() {
620
      return data == null;
1✔
621
    }
622

623
    @Nullable
624
    StatusOr<T> getData() {
625
      return data;
1✔
626
    }
627

628
    boolean hasDataValue() {
629
      return data != null && data.hasValue();
1✔
630
    }
631

632
    String resourceName() {
633
      return resourceName;
1✔
634
    }
635

636
    protected void setData(T data) {
637
      checkNotNull(data, "data");
1✔
638
      this.data = StatusOr.fromValue(data);
1✔
639
    }
1✔
640

641
    protected void setDataAsStatus(Status status) {
642
      checkNotNull(status, "status");
1✔
643
      this.data = StatusOr.fromStatus(status);
1✔
644
    }
1✔
645

646
    public String toContextString() {
647
      return toContextStr(type.typeName(), resourceName);
1✔
648
    }
649
  }
650

651
  private interface RdsUpdateSupplier {
652
    StatusOr<RdsUpdate> getRdsUpdate();
653
  }
654

655
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
656
      implements RdsUpdateSupplier {
657
    String rdsName;
658

659
    private LdsWatcher(String resourceName) {
1✔
660
      super(XdsListenerResource.getInstance(), resourceName);
1✔
661
    }
1✔
662

663
    @Override
664
    public void onChanged(XdsListenerResource.LdsUpdate update) {
665
      checkNotNull(update, "update");
1✔
666
      if (cancelled) {
1✔
667
        return;
1✔
668
      }
669

670
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
671
      List<VirtualHost> virtualHosts;
672
      String rdsName;
673
      if (httpConnectionManager == null) {
1✔
674
        // TCP listener. Unsupported config
675
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
676
        rdsName = null;
1✔
677
      } else {
678
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
679
        rdsName = httpConnectionManager.rdsName();
1✔
680
      }
681
      StatusOr<RdsUpdate> activeRdsUpdate = getRouteSource().getRdsUpdate();
1✔
682
      List<VirtualHost> activeVirtualHosts = activeRdsUpdate.hasValue()
1✔
683
          ? activeRdsUpdate.getValue().virtualHosts
1✔
684
          : Collections.emptyList();
1✔
685

686
      boolean changedRdsName = !Objects.equals(rdsName, this.rdsName);
1✔
687
      if (changedRdsName) {
1✔
688
        cleanUpRdsWatcher();
1✔
689
      }
690

691
      if (virtualHosts != null) {
1✔
692
        // No RDS watcher since we are getting RDS updates via LDS
693
        updateRoutes(virtualHosts, this, activeVirtualHosts, this.rdsName == null);
1✔
694
        this.rdsName = null;
1✔
695
      } else if (changedRdsName) {
1✔
696
        this.rdsName = rdsName;
1✔
697
        addWatcher(new RdsWatcher(rdsName));
1✔
698
        logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
699
      }
700

701
      setData(update);
1✔
702
      maybePublishConfig();
1✔
703
    }
1✔
704

705
    @Override
706
    public void onResourceDoesNotExist(String resourceName) {
707
      if (cancelled) {
1✔
708
        return;
×
709
      }
710

711
      checkArgument(resourceName().equals(resourceName), "Resource name does not match");
1✔
712
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
713
          toContextString() + " does not exist" + nodeInfo()));
1✔
714
      cleanUpRdsWatcher();
1✔
715
      rdsName = null;
1✔
716
      maybePublishConfig();
1✔
717
    }
1✔
718

719
    private void cleanUpRdsWatcher() {
720
      RdsWatcher oldRdsWatcher = getRdsWatcher();
1✔
721
      if (oldRdsWatcher != null) {
1✔
722
        cancelWatcher(oldRdsWatcher);
1✔
723
        logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName);
1✔
724

725
        // Cleanup clusters (as appropriate) that had the old rds watcher as a parent
726
        if (!oldRdsWatcher.hasDataValue() || resourceWatchers.get(CLUSTER_RESOURCE) == null) {
1✔
727
          return;
×
728
        }
729
        for (XdsWatcherBase<?> watcher :
730
            resourceWatchers.get(CLUSTER_RESOURCE).watchers.values()) {
1✔
731
          cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher);
1✔
732
        }
1✔
733
      }
734
    }
1✔
735

736
    private RdsWatcher getRdsWatcher() {
737
      if (rdsName == null) {
1✔
738
        return null;
1✔
739
      }
740
      TypeWatchers<?> watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
1✔
741
      if (watchers == null) {
1✔
742
        return null;
×
743
      }
744
      return (RdsWatcher) watchers.watchers.get(rdsName);
1✔
745
    }
746

747
    public RdsUpdateSupplier getRouteSource() {
748
      if (!hasDataValue()) {
1✔
749
        return this;
1✔
750
      }
751
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
752
      if (hcm == null) {
1✔
753
        return this;
1✔
754
      }
755
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
756
      if (virtualHosts != null) {
1✔
757
        return this;
1✔
758
      }
759
      RdsWatcher rdsWatcher = getRdsWatcher();
1✔
760
      assert rdsWatcher != null;
1✔
761
      return rdsWatcher;
1✔
762
    }
763

764
    @Override
765
    public StatusOr<RdsUpdate> getRdsUpdate() {
766
      if (missingResult()) {
1✔
767
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
1✔
768
      }
769
      if (!getData().hasValue()) {
1✔
770
        return StatusOr.fromStatus(getData().getStatus());
1✔
771
      }
772
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
773
      if (hcm == null) {
1✔
774
        return StatusOr.fromStatus(
1✔
775
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
776
      }
777
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
778
      if (virtualHosts == null) {
1✔
779
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
780
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
781
        // bug
782
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
783
      }
784
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
785
    }
786
  }
787

788
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
789

790
    public RdsWatcher(String resourceName) {
1✔
791
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
792
    }
1✔
793

794
    @Override
795
    public void onChanged(RdsUpdate update) {
796
      checkNotNull(update, "update");
1✔
797
      if (cancelled) {
1✔
798
        return;
1✔
799
      }
800
      List<VirtualHost> oldVirtualHosts = hasDataValue()
1✔
801
          ? getData().getValue().virtualHosts
1✔
802
          : Collections.emptyList();
1✔
803
      setData(update);
1✔
804
      updateRoutes(update.virtualHosts, this, oldVirtualHosts, true);
1✔
805
      maybePublishConfig();
1✔
806
    }
1✔
807

808
    @Override
809
    public StatusOr<RdsUpdate> getRdsUpdate() {
810
      if (missingResult()) {
1✔
811
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
812
      }
813
      return getData();
1✔
814
    }
815
  }
816

817
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
818
    Map<Object, Integer> parentContexts = new HashMap<>();
1✔
819

820
    CdsWatcher(String resourceName, Object parentContext, int depth) {
1✔
821
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
822
      this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth);
1✔
823
    }
1✔
824

825
    @Override
826
    public void onChanged(XdsClusterResource.CdsUpdate update) {
827
      checkNotNull(update, "update");
1✔
828
      if (cancelled) {
1✔
829
        return;
1✔
830
      }
831
      switch (update.clusterType()) {
1✔
832
        case EDS:
833
          setData(update);
1✔
834
          if (!addEdsWatcher(getEdsServiceName(), this))  {
1✔
835
            maybePublishConfig();
1✔
836
          }
837
          break;
838
        case LOGICAL_DNS:
839
          setData(update);
1✔
840
          maybePublishConfig();
1✔
841
          // no eds needed
842
          break;
1✔
843
        case AGGREGATE:
844
          Object parentContext = this;
1✔
845
          int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1;
1✔
846
          if (depth > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
847
            logger.log(XdsLogger.XdsLogLevel.WARNING,
×
848
                "Cluster recursion depth limit exceeded for cluster {0}", resourceName());
×
849
            Status error = Status.UNAVAILABLE.withDescription(
×
850
                "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
×
851
            setDataAsStatus(error);
×
852
          }
853
          if (hasDataValue()) {
1✔
854
            Set<String> oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE
1✔
855
                ? new HashSet<>(getData().getValue().prioritizedClusterNames())
1✔
856
                : Collections.emptySet();
1✔
857
            Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
1✔
858

859
            Set<String> deletedClusters = Sets.difference(oldNames, newNames);
1✔
860
            deletedClusters.forEach((cluster)
1✔
861
                -> cancelClusterWatcherTree(getCluster(cluster), parentContext));
1✔
862

863
            if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
864
              setData(update);
1✔
865
              Set<String> addedClusters = Sets.difference(newNames, oldNames);
1✔
866
              addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth));
1✔
867

868
              if (addedClusters.isEmpty()) {
1✔
869
                maybePublishConfig();
×
870
              }
871
            } else { // data was set to error status above
1✔
872
              maybePublishConfig();
×
873
            }
874

875
          } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
876
            setData(update);
1✔
877
            update.prioritizedClusterNames()
1✔
878
                .forEach(name -> addClusterWatcher(name, parentContext, depth));
1✔
879
            maybePublishConfig();
1✔
880
          }
881
          break;
882
        default:
883
          Status error = Status.UNAVAILABLE.withDescription(
×
884
              "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
×
885
          setDataAsStatus(error);
×
886
          maybePublishConfig();
×
887
      }
888
    }
1✔
889

890
    public String getEdsServiceName() {
891
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
892
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
893
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
894
      if (edsServiceName == null) {
1✔
895
        edsServiceName = cdsUpdate.clusterName();
×
896
      }
897
      return edsServiceName;
1✔
898
    }
899
  }
900

901
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
902
    private final Set<CdsWatcher> parentContexts = new HashSet<>();
1✔
903

904
    private EdsWatcher(String resourceName, CdsWatcher parentContext) {
1✔
905
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
906
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
907
    }
1✔
908

909
    @Override
910
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
911
      if (cancelled) {
1✔
912
        return;
1✔
913
      }
914
      setData(checkNotNull(update, "update"));
1✔
915
      maybePublishConfig();
1✔
916
    }
1✔
917

918
    void addParentContext(CdsWatcher parentContext) {
919
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
920
    }
1✔
921
  }
922
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc