• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19793

23 Apr 2025 04:18PM UTC coverage: 88.641% (+0.003%) from 88.638%
#19793

push

github

web-flow
xds: XdsDepManager should ignore updates after shutdown

This prevents a NPE and subsequent channel panic when trying to build a
config (because there are no watchers, so waitingOnResource==false)
without any listener and route.
```
java.lang.NullPointerException: Cannot invoke "io.grpc.xds.XdsDependencyManager$RdsUpdateSupplier.getRdsUpdate()" because "routeSource" is null
    at io.grpc.xds.XdsDependencyManager.buildUpdate(XdsDependencyManager.java:295)
    at io.grpc.xds.XdsDependencyManager.maybePublishConfig(XdsDependencyManager.java:266)
    at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:899)
    at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:888)
    at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.notifyWatcher(XdsClientImpl.java:929)
    at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.lambda$onData$0(XdsClientImpl.java:837)
    at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:96)
```

I think this fully-fixes the problem today, but not tomorrow.
subscribeToCluster() is racy as well, but not yet used.

This was noticed when idleTimeout was firing, with some other code
calling getState(true) to wake the channel back up. That may have made
this panic more visible than it would be otherwise, but that has not
been investigated.

b/412474567

34789 of 39247 relevant lines covered (88.64%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.77
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.Sets;
26
import io.grpc.InternalLogId;
27
import io.grpc.NameResolver;
28
import io.grpc.Status;
29
import io.grpc.StatusOr;
30
import io.grpc.SynchronizationContext;
31
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
32
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
33
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
34
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
35
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
36
import io.grpc.xds.client.XdsClient;
37
import io.grpc.xds.client.XdsClient.ResourceWatcher;
38
import io.grpc.xds.client.XdsLogger;
39
import io.grpc.xds.client.XdsResourceType;
40
import java.io.Closeable;
41
import java.io.IOException;
42
import java.util.Collections;
43
import java.util.HashMap;
44
import java.util.HashSet;
45
import java.util.List;
46
import java.util.Map;
47
import java.util.Objects;
48
import java.util.Set;
49
import java.util.concurrent.ScheduledExecutorService;
50
import java.util.stream.Collectors;
51
import javax.annotation.Nullable;
52

53
/**
54
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
55
 * maintains the watchers for the xds resources and when an update is received, it either requests
56
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
57
 * applies to a single data plane authority.
58
 */
59
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
60
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
61
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
62
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
63
  private final XdsClient xdsClient;
64
  private final XdsConfigWatcher xdsConfigWatcher;
65
  private final SynchronizationContext syncContext;
66
  private final String dataPlaneAuthority;
67

68
  private final InternalLogId logId;
69
  private final XdsLogger logger;
70
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
71
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
72

73
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
74
                       SynchronizationContext syncContext, String dataPlaneAuthority,
75
                       String listenerName, NameResolver.Args nameResolverArgs,
76
                       ScheduledExecutorService scheduler) {
1✔
77
    logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
1✔
78
    logger = XdsLogger.withLogId(logId);
1✔
79
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
80
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
81
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
82
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
83
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
84
    checkNotNull(scheduler, "scheduler");
1✔
85

86
    // start the ball rolling
87
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
88
  }
1✔
89

90
  public static String toContextStr(String typeName, String resourceName) {
91
    return typeName + " resource " + resourceName;
1✔
92
  }
93

94
  @Override
95
  public Closeable subscribeToCluster(String clusterName) {
96

97
    checkNotNull(clusterName, "clusterName");
1✔
98
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
99

100
    syncContext.execute(() -> {
1✔
101
      addClusterWatcher(clusterName, subscription, 1);
1✔
102
      maybePublishConfig();
1✔
103
    });
1✔
104

105
    return subscription;
1✔
106
  }
107

108
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
109
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
110
    XdsResourceType<T> type = watcher.type;
1✔
111
    String resourceName = watcher.resourceName;
1✔
112

113
    @SuppressWarnings("unchecked")
114
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
115
    if (typeWatchers == null) {
1✔
116
      typeWatchers = new TypeWatchers<>(type);
1✔
117
      resourceWatchers.put(type, typeWatchers);
1✔
118
    }
119

120
    typeWatchers.add(resourceName, watcher);
1✔
121
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
122
  }
1✔
123

124
  private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) {
125
    if (watcher == null) {
1✔
126
      return;
×
127
    }
128
    watcher.parentContexts.remove(parentContext);
1✔
129
    if (watcher.parentContexts.isEmpty()) {
1✔
130
      cancelWatcher(watcher);
1✔
131
    }
132
  }
1✔
133

134
  private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) {
135
    if (watcher == null) {
1✔
136
      return;
×
137
    }
138
    watcher.parentContexts.remove(parentContext);
1✔
139
    if (watcher.parentContexts.isEmpty()) {
1✔
140
      cancelWatcher(watcher);
1✔
141
    }
142
  }
1✔
143

144
  private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
145
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
146

147
    if (watcher == null) {
1✔
148
      return;
×
149
    }
150

151
    if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) {
1✔
152
      throwIfParentContextsNotEmpty(watcher);
1✔
153
    }
154

155
    watcher.cancelled = true;
1✔
156
    XdsResourceType<T> type = watcher.type;
1✔
157
    String resourceName = watcher.resourceName;
1✔
158

159
    @SuppressWarnings("unchecked")
160
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
161
    if (typeWatchers == null) {
1✔
162
      logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher);
×
163
      return;
×
164
    }
165

166
    typeWatchers.watchers.remove(resourceName);
1✔
167
    xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);
1✔
168

169
  }
1✔
170

171
  private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
172
    if (watcher instanceof CdsWatcher) {
1✔
173
      CdsWatcher cdsWatcher = (CdsWatcher) watcher;
1✔
174
      if (!cdsWatcher.parentContexts.isEmpty()) {
1✔
175
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
176
            cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet());
×
177
        throw new IllegalStateException(msg);
×
178
      }
179
    } else if (watcher instanceof EdsWatcher) {
1✔
180
      EdsWatcher edsWatcher = (EdsWatcher) watcher;
1✔
181
      if (!edsWatcher.parentContexts.isEmpty()) {
1✔
182
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
183
            edsWatcher.resourceName(), edsWatcher.parentContexts);
×
184
        throw new IllegalStateException(msg);
×
185
      }
186
    }
187
  }
1✔
188

189
  public void shutdown() {
190
    syncContext.execute(() -> {
1✔
191
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
192
        shutdownWatchersForType(watchers);
1✔
193
      }
1✔
194
      resourceWatchers.clear();
1✔
195
    });
1✔
196
  }
1✔
197

198
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
199
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
200
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
201
          watcherEntry.getValue());
1✔
202
      watcherEntry.getValue().cancelled = true;
1✔
203
    }
1✔
204
  }
1✔
205

206
  private void releaseSubscription(ClusterSubscription subscription) {
207
    checkNotNull(subscription, "subscription");
1✔
208
    String clusterName = subscription.getClusterName();
1✔
209
    syncContext.execute(() -> {
1✔
210
      XdsWatcherBase<?> cdsWatcher =
1✔
211
          resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
212
      if (cdsWatcher == null) {
1✔
213
        return; // already released while waiting for the syncContext
×
214
      }
215
      cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
1✔
216
      maybePublishConfig();
1✔
217
    });
1✔
218
  }
1✔
219

220
  private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) {
221
    checkNotNull(root, "root");
1✔
222

223
    cancelCdsWatcher(root, parentContext);
1✔
224

225
    if (!root.hasDataValue() || !root.parentContexts.isEmpty()) {
1✔
226
      return;
1✔
227
    }
228

229
    XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue();
1✔
230
    switch (cdsUpdate.clusterType()) {
1✔
231
      case EDS:
232
        String edsServiceName = root.getEdsServiceName();
1✔
233
        EdsWatcher edsWatcher =
1✔
234
            (EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName);
1✔
235
        cancelEdsWatcher(edsWatcher, root);
1✔
236
        break;
1✔
237
      case AGGREGATE:
238
        for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
239
          CdsWatcher clusterWatcher =
1✔
240
              (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(cluster);
1✔
241
          if (clusterWatcher != null) {
1✔
242
            cancelClusterWatcherTree(clusterWatcher, root);
1✔
243
          }
244
        }
1✔
245
        break;
1✔
246
      case LOGICAL_DNS:
247
        // no eds needed, so everything happens in cancelCdsWatcher()
248
        break;
×
249
      default:
250
        throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType());
×
251
    }
252
  }
1✔
253

254
  /**
255
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
256
   * the watchers.
257
   */
258
  private void maybePublishConfig() {
259
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
260
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
261
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
262
        .anyMatch(XdsWatcherBase::missingResult);
1✔
263
    if (waitingOnResource) {
1✔
264
      return;
1✔
265
    }
266

267
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
268
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
269
      return;
1✔
270
    }
271
    assert newUpdate.hasValue()
1✔
272
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
273
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
274
    lastUpdate = newUpdate;
1✔
275
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
276
  }
1✔
277

278
  @VisibleForTesting
279
  StatusOr<XdsConfig> buildUpdate() {
280
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
281

282
    // Iterate watchers and build the XdsConfig
283

284
    // Will only be 1 listener and 1 route resource
285
    RdsUpdateSupplier routeSource = null;
1✔
286
    for (XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher :
287
        getWatchers(XdsListenerResource.getInstance()).values()) {
1✔
288
      if (!ldsWatcher.getData().hasValue()) {
1✔
289
        return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
290
      }
291
      XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
292
      builder.setListener(ldsUpdate);
1✔
293
      routeSource = ((LdsWatcher) ldsWatcher).getRouteSource();
1✔
294
    }
1✔
295

296
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
297
    if (!statusOrRdsUpdate.hasValue()) {
1✔
298
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
299
    }
300
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
301
    builder.setRoute(rdsUpdate);
1✔
302

303
    VirtualHost activeVirtualHost =
1✔
304
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
305
    if (activeVirtualHost == null) {
1✔
306
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
307
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
308
    }
309
    builder.setVirtualHost(activeVirtualHost);
1✔
310

311
    Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers =
1✔
312
        getWatchers(ENDPOINT_RESOURCE);
1✔
313
    Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers =
1✔
314
        getWatchers(CLUSTER_RESOURCE);
1✔
315

316
    // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
317
    List<String> topLevelClusters =
1✔
318
        cdsWatchers.values().stream()
1✔
319
            .filter(XdsDependencyManager::isTopLevelCluster)
1✔
320
            .map(XdsWatcherBase<?>::resourceName)
1✔
321
            .distinct()
1✔
322
            .collect(Collectors.toList());
1✔
323

324
    // Flatten multi-level aggregates into lists of leaf clusters
325
    Set<String> leafNames =
1✔
326
        addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters);
1✔
327

328
    addLeavesToBuilder(builder, edsWatchers, leafNames);
1✔
329

330
    return StatusOr.fromValue(builder.build());
1✔
331
  }
332

333
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
334
      XdsResourceType<T> resourceType) {
335
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
336
    if (typeWatchers == null) {
1✔
337
      return Collections.emptyMap();
1✔
338
    }
339
    assert typeWatchers.resourceType == resourceType;
1✔
340
    @SuppressWarnings("unchecked")
341
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
342
    return tTypeWatchers.watchers;
1✔
343
  }
344

345
  private void addLeavesToBuilder(
346
      XdsConfig.XdsConfigBuilder builder,
347
      Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
348
      Set<String> leafNames) {
349
    for (String clusterName : leafNames) {
1✔
350
      CdsWatcher cdsWatcher = getCluster(clusterName);
1✔
351
      StatusOr<XdsClusterResource.CdsUpdate> cdsUpdateOr = cdsWatcher.getData();
1✔
352

353
      if (!cdsUpdateOr.hasValue()) {
1✔
354
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus()));
1✔
355
        continue;
1✔
356
      }
357

358
      XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue();
1✔
359
      if (cdsUpdate.clusterType() == ClusterType.EDS) {
1✔
360
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
361
            edsWatchers.get(cdsWatcher.getEdsServiceName());
1✔
362
        EndpointConfig child;
363
        if (edsWatcher != null) {
1✔
364
          child = new EndpointConfig(edsWatcher.getData());
1✔
365
        } else {
366
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
367
              "EDS resource not found for cluster " + clusterName)));
368
        }
369
        builder.addCluster(clusterName, StatusOr.fromValue(
1✔
370
            new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
371
      } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) {
1✔
372
        builder.addCluster(clusterName, StatusOr.fromStatus(
×
373
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
×
374
      }
375
    }
1✔
376
  }
1✔
377

378
  // Adds the top-level clusters to the builder and returns the leaf cluster names
379
  private Set<String> addTopLevelClustersToBuilder(
380
      XdsConfig.XdsConfigBuilder builder,
381
      Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
382
      Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers,
383
      List<String> topLevelClusters) {
384

385
    Set<String> leafClusterNames = new HashSet<>();
1✔
386
    for (String clusterName : topLevelClusters) {
1✔
387
      CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName);
1✔
388
      StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
389
      if (!cdsWatcher.hasDataValue()) {
1✔
390
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
391
        continue;
1✔
392
      }
393

394
      XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
395
      XdsConfig.XdsClusterConfig.ClusterChild child;
396
      switch (cdsUpdate.clusterType()) {
1✔
397
        case AGGREGATE:
398
          Set<String> leafNames = new HashSet<>();
1✔
399
          addLeafNames(leafNames, cdsUpdate);
1✔
400
          child = new AggregateConfig(leafNames);
1✔
401
          leafClusterNames.addAll(leafNames);
1✔
402
          break;
1✔
403
        case EDS:
404
          XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
405
              edsWatchers.get(cdsWatcher.getEdsServiceName());
1✔
406
          if (edsWatcher != null) {
1✔
407
            child = new EndpointConfig(edsWatcher.getData());
1✔
408
          } else {
409
            child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
410
                "EDS resource not found for cluster " + clusterName)));
411
          }
412
          break;
×
413
        case LOGICAL_DNS:
414
          // TODO get the resolved endpoint configuration
415
          child = new EndpointConfig(StatusOr.fromStatus(
1✔
416
              Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
417
          break;
1✔
418
        default:
419
          throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
420
      }
421
      builder.addCluster(clusterName, StatusOr.fromValue(
1✔
422
          new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
423
    }
1✔
424

425
    return leafClusterNames;
1✔
426
  }
427

428
  private void addLeafNames(Set<String> leafNames, XdsClusterResource.CdsUpdate cdsUpdate) {
429
    for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
430
      if (leafNames.contains(cluster)) {
1✔
431
        continue;
1✔
432
      }
433
      StatusOr<XdsClusterResource.CdsUpdate> data = getCluster(cluster).getData();
1✔
434
      if (data == null || !data.hasValue() || data.getValue() == null) {
1✔
435
        leafNames.add(cluster);
1✔
436
        continue;
1✔
437
      }
438
      if (data.getValue().clusterType() == ClusterType.AGGREGATE) {
1✔
439
        addLeafNames(leafNames, data.getValue());
1✔
440
      } else {
441
        leafNames.add(cluster);
1✔
442
      }
443
    }
1✔
444
  }
1✔
445

446
  private static boolean isTopLevelCluster(
447
      XdsWatcherBase<XdsClusterResource.CdsUpdate> cdsWatcher) {
448
    return ((CdsWatcher)cdsWatcher).parentContexts.values().stream()
1✔
449
        .anyMatch(depth -> depth == 1);
1✔
450
  }
451

452
  @Override
453
  public String toString() {
454
    return logId.toString();
×
455
  }
456

457
  // Returns true if the watcher was added, false if it already exists
458
  private boolean addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
459
    TypeWatchers<?> typeWatchers = resourceWatchers.get(XdsEndpointResource.getInstance());
1✔
460
    if (typeWatchers == null || !typeWatchers.watchers.containsKey(edsServiceName)) {
1✔
461
      addWatcher(new EdsWatcher(edsServiceName, parentContext));
1✔
462
      return true;
1✔
463
    }
464

465
    EdsWatcher watcher = (EdsWatcher) typeWatchers.watchers.get(edsServiceName);
1✔
466
    watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence
1✔
467
    return false;
1✔
468
  }
469

470
  private void addClusterWatcher(String clusterName, Object parentContext, int depth) {
471
    TypeWatchers<?> clusterWatchers = resourceWatchers.get(CLUSTER_RESOURCE);
1✔
472
    if (clusterWatchers != null) {
1✔
473
      CdsWatcher watcher = (CdsWatcher) clusterWatchers.watchers.get(clusterName);
1✔
474
      if (watcher != null) {
1✔
475
        watcher.parentContexts.put(parentContext, depth);
1✔
476
        return;
1✔
477
      }
478
    }
479

480
    addWatcher(new CdsWatcher(clusterName, parentContext, depth));
1✔
481
  }
1✔
482

483
  private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
484
                            List<VirtualHost> oldVirtualHosts, boolean sameParentContext) {
485
    VirtualHost oldVirtualHost =
1✔
486
        RoutingUtils.findVirtualHostForHostName(oldVirtualHosts, dataPlaneAuthority);
1✔
487
    VirtualHost virtualHost =
1✔
488
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
489

490
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
491
    Set<String> oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost);
1✔
492

493
    if (sameParentContext) {
1✔
494
      // Calculate diffs.
495
      Set<String> addedClusters = Sets.difference(newClusters, oldClusters);
1✔
496
      Set<String> deletedClusters = Sets.difference(oldClusters, newClusters);
1✔
497

498
      deletedClusters.forEach(watcher ->
1✔
499
          cancelClusterWatcherTree(getCluster(watcher), newParentContext));
1✔
500
      addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
501
    } else {
1✔
502
      newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
503
    }
504
  }
1✔
505

506
  private String nodeInfo() {
507
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
508
  }
509

510
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
511
    if (virtualHost == null) {
1✔
512
      return Collections.emptySet();
1✔
513
    }
514

515
    // Get all cluster names to which requests can be routed through the virtual host.
516
    Set<String> clusters = new HashSet<>();
1✔
517
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
518
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
519
      if (action == null) {
1✔
520
        continue;
1✔
521
      }
522
      if (action.cluster() != null) {
1✔
523
        clusters.add(action.cluster());
1✔
524
      } else if (action.weightedClusters() != null) {
1✔
525
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
526
          clusters.add(weighedCluster.name());
1✔
527
        }
1✔
528
      }
529
    }
1✔
530

531
    return clusters;
1✔
532
  }
533

534
  private CdsWatcher getCluster(String clusterName) {
535
    return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
536
  }
537

538
  private static class TypeWatchers<T extends ResourceUpdate> {
539
    // Key is resource name
540
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
541
    final XdsResourceType<T> resourceType;
542

543
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
544
      this.resourceType = resourceType;
1✔
545
    }
1✔
546

547
    public void add(String resourceName, XdsWatcherBase<T> watcher) {
548
      watchers.put(resourceName, watcher);
1✔
549
    }
1✔
550
  }
551

552
  public interface XdsConfigWatcher {
553
    /**
554
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
555
     * INTERNAL.
556
     */
557
    void onUpdate(StatusOr<XdsConfig> config);
558
  }
559

560
  private class ClusterSubscription implements Closeable {
561
    String clusterName;
562

563
    public ClusterSubscription(String clusterName) {
1✔
564
      this.clusterName = clusterName;
1✔
565
    }
1✔
566

567
    public String getClusterName() {
568
      return clusterName;
1✔
569
    }
570

571
    @Override
572
    public void close() throws IOException {
573
      releaseSubscription(this);
1✔
574
    }
1✔
575
  }
576

577
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
578
      implements ResourceWatcher<T> {
579
    private final XdsResourceType<T> type;
580
    private final String resourceName;
581
    boolean cancelled;
582

583
    @Nullable
584
    private StatusOr<T> data;
585

586

587
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
588
      this.type = checkNotNull(type, "type");
1✔
589
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
590
    }
1✔
591

592
    @Override
593
    public void onError(Status error) {
594
      checkNotNull(error, "error");
1✔
595
      if (cancelled) {
1✔
596
        return;
×
597
      }
598
      // Don't update configuration on error, if we've already received configuration
599
      if (!hasDataValue()) {
1✔
600
        setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
601
            String.format("Error retrieving %s: %s: %s",
1✔
602
              toContextString(), error.getCode(), error.getDescription())));
1✔
603
        maybePublishConfig();
1✔
604
      }
605
    }
1✔
606

607
    @Override
608
    public void onResourceDoesNotExist(String resourceName) {
609
      if (cancelled) {
1✔
610
        return;
1✔
611
      }
612

613
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
614
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
615
          toContextString() + " does not exist" + nodeInfo()));
1✔
616
      maybePublishConfig();
1✔
617
    }
1✔
618

619
    boolean missingResult() {
620
      return data == null;
1✔
621
    }
622

623
    @Nullable
624
    StatusOr<T> getData() {
625
      return data;
1✔
626
    }
627

628
    boolean hasDataValue() {
629
      return data != null && data.hasValue();
1✔
630
    }
631

632
    String resourceName() {
633
      return resourceName;
1✔
634
    }
635

636
    protected void setData(T data) {
637
      checkNotNull(data, "data");
1✔
638
      this.data = StatusOr.fromValue(data);
1✔
639
    }
1✔
640

641
    protected void setDataAsStatus(Status status) {
642
      checkNotNull(status, "status");
1✔
643
      this.data = StatusOr.fromStatus(status);
1✔
644
    }
1✔
645

646
    public String toContextString() {
647
      return toContextStr(type.typeName(), resourceName);
1✔
648
    }
649
  }
650

651
  private interface RdsUpdateSupplier {
652
    StatusOr<RdsUpdate> getRdsUpdate();
653
  }
654

655
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
656
      implements RdsUpdateSupplier {
657
    String rdsName;
658

659
    private LdsWatcher(String resourceName) {
1✔
660
      super(XdsListenerResource.getInstance(), resourceName);
1✔
661
    }
1✔
662

663
    @Override
664
    public void onChanged(XdsListenerResource.LdsUpdate update) {
665
      checkNotNull(update, "update");
1✔
666
      if (cancelled) {
1✔
667
        return;
1✔
668
      }
669

670
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
671
      List<VirtualHost> virtualHosts;
672
      String rdsName;
673
      if (httpConnectionManager == null) {
1✔
674
        // TCP listener. Unsupported config
675
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
676
        rdsName = null;
1✔
677
      } else {
678
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
679
        rdsName = httpConnectionManager.rdsName();
1✔
680
      }
681
      StatusOr<RdsUpdate> activeRdsUpdate = getRouteSource().getRdsUpdate();
1✔
682
      List<VirtualHost> activeVirtualHosts = activeRdsUpdate.hasValue()
1✔
683
          ? activeRdsUpdate.getValue().virtualHosts
1✔
684
          : Collections.emptyList();
1✔
685

686
      boolean changedRdsName = !Objects.equals(rdsName, this.rdsName);
1✔
687
      if (changedRdsName) {
1✔
688
        cleanUpRdsWatcher();
1✔
689
      }
690

691
      if (virtualHosts != null) {
1✔
692
        // No RDS watcher since we are getting RDS updates via LDS
693
        updateRoutes(virtualHosts, this, activeVirtualHosts, this.rdsName == null);
1✔
694
        this.rdsName = null;
1✔
695
      } else if (changedRdsName) {
1✔
696
        this.rdsName = rdsName;
1✔
697
        addWatcher(new RdsWatcher(rdsName));
1✔
698
        logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
699
      }
700

701
      setData(update);
1✔
702
      maybePublishConfig();
1✔
703
    }
1✔
704

705
    @Override
706
    public void onResourceDoesNotExist(String resourceName) {
707
      if (cancelled) {
1✔
708
        return;
×
709
      }
710

711
      checkArgument(resourceName().equals(resourceName), "Resource name does not match");
1✔
712
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
713
          toContextString() + " does not exist" + nodeInfo()));
1✔
714
      cleanUpRdsWatcher();
1✔
715
      rdsName = null;
1✔
716
      maybePublishConfig();
1✔
717
    }
1✔
718

719
    private void cleanUpRdsWatcher() {
720
      RdsWatcher oldRdsWatcher = getRdsWatcher();
1✔
721
      if (oldRdsWatcher != null) {
1✔
722
        cancelWatcher(oldRdsWatcher);
1✔
723
        logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName);
1✔
724

725
        // Cleanup clusters (as appropriate) that had the old rds watcher as a parent
726
        if (!oldRdsWatcher.hasDataValue() || resourceWatchers.get(CLUSTER_RESOURCE) == null) {
1✔
727
          return;
×
728
        }
729
        for (XdsWatcherBase<?> watcher :
730
            resourceWatchers.get(CLUSTER_RESOURCE).watchers.values()) {
1✔
731
          cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher);
1✔
732
        }
1✔
733
      }
734
    }
1✔
735

736
    private RdsWatcher getRdsWatcher() {
737
      if (rdsName == null) {
1✔
738
        return null;
1✔
739
      }
740
      TypeWatchers<?> watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
1✔
741
      if (watchers == null) {
1✔
742
        return null;
×
743
      }
744
      return (RdsWatcher) watchers.watchers.get(rdsName);
1✔
745
    }
746

747
    public RdsUpdateSupplier getRouteSource() {
748
      if (!hasDataValue()) {
1✔
749
        return this;
1✔
750
      }
751
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
752
      if (hcm == null) {
1✔
753
        return this;
1✔
754
      }
755
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
756
      if (virtualHosts != null) {
1✔
757
        return this;
1✔
758
      }
759
      RdsWatcher rdsWatcher = getRdsWatcher();
1✔
760
      assert rdsWatcher != null;
1✔
761
      return rdsWatcher;
1✔
762
    }
763

764
    @Override
765
    public StatusOr<RdsUpdate> getRdsUpdate() {
766
      if (missingResult()) {
1✔
767
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
1✔
768
      }
769
      if (!getData().hasValue()) {
1✔
770
        return StatusOr.fromStatus(getData().getStatus());
1✔
771
      }
772
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
773
      if (hcm == null) {
1✔
774
        return StatusOr.fromStatus(
1✔
775
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
776
      }
777
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
778
      if (virtualHosts == null) {
1✔
779
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
780
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
781
        // bug
782
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
783
      }
784
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
785
    }
786
  }
787

788
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
789

790
    public RdsWatcher(String resourceName) {
1✔
791
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
792
    }
1✔
793

794
    @Override
795
    public void onChanged(RdsUpdate update) {
796
      checkNotNull(update, "update");
1✔
797
      if (cancelled) {
1✔
798
        return;
1✔
799
      }
800
      List<VirtualHost> oldVirtualHosts = hasDataValue()
1✔
801
          ? getData().getValue().virtualHosts
1✔
802
          : Collections.emptyList();
1✔
803
      setData(update);
1✔
804
      updateRoutes(update.virtualHosts, this, oldVirtualHosts, true);
1✔
805
      maybePublishConfig();
1✔
806
    }
1✔
807

808
    @Override
809
    public StatusOr<RdsUpdate> getRdsUpdate() {
810
      if (missingResult()) {
1✔
811
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
812
      }
813
      return getData();
1✔
814
    }
815
  }
816

817
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
818
    Map<Object, Integer> parentContexts = new HashMap<>();
1✔
819

820
    CdsWatcher(String resourceName, Object parentContext, int depth) {
1✔
821
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
822
      this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth);
1✔
823
    }
1✔
824

825
    @Override
826
    public void onChanged(XdsClusterResource.CdsUpdate update) {
827
      checkNotNull(update, "update");
1✔
828
      if (cancelled) {
1✔
829
        return;
1✔
830
      }
831
      switch (update.clusterType()) {
1✔
832
        case EDS:
833
          setData(update);
1✔
834
          if (!addEdsWatcher(getEdsServiceName(), this))  {
1✔
835
            maybePublishConfig();
1✔
836
          }
837
          break;
838
        case LOGICAL_DNS:
839
          setData(update);
1✔
840
          maybePublishConfig();
1✔
841
          // no eds needed
842
          break;
1✔
843
        case AGGREGATE:
844
          Object parentContext = this;
1✔
845
          int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1;
1✔
846
          if (depth > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
847
            logger.log(XdsLogger.XdsLogLevel.WARNING,
×
848
                "Cluster recursion depth limit exceeded for cluster {0}", resourceName());
×
849
            Status error = Status.UNAVAILABLE.withDescription(
×
850
                "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
×
851
            setDataAsStatus(error);
×
852
          }
853
          if (hasDataValue()) {
1✔
854
            Set<String> oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE
1✔
855
                ? new HashSet<>(getData().getValue().prioritizedClusterNames())
1✔
856
                : Collections.emptySet();
1✔
857
            Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
1✔
858

859
            Set<String> deletedClusters = Sets.difference(oldNames, newNames);
1✔
860
            deletedClusters.forEach((cluster)
1✔
861
                -> cancelClusterWatcherTree(getCluster(cluster), parentContext));
1✔
862

863
            if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
864
              setData(update);
1✔
865
              Set<String> addedClusters = Sets.difference(newNames, oldNames);
1✔
866
              addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth));
1✔
867

868
              if (addedClusters.isEmpty()) {
1✔
869
                maybePublishConfig();
×
870
              }
871
            } else { // data was set to error status above
1✔
872
              maybePublishConfig();
×
873
            }
874

875
          } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
876
            setData(update);
1✔
877
            update.prioritizedClusterNames()
1✔
878
                .forEach(name -> addClusterWatcher(name, parentContext, depth));
1✔
879
            maybePublishConfig();
1✔
880
          }
881
          break;
882
        default:
883
          Status error = Status.UNAVAILABLE.withDescription(
×
884
              "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
×
885
          setDataAsStatus(error);
×
886
          maybePublishConfig();
×
887
      }
888
    }
1✔
889

890
    public String getEdsServiceName() {
891
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
892
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
893
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
894
      if (edsServiceName == null) {
1✔
895
        edsServiceName = cdsUpdate.clusterName();
×
896
      }
897
      return edsServiceName;
1✔
898
    }
899
  }
900

901
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
902
    private final Set<CdsWatcher> parentContexts = new HashSet<>();
1✔
903

904
    private EdsWatcher(String resourceName, CdsWatcher parentContext) {
1✔
905
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
906
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
907
    }
1✔
908

909
    @Override
910
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
911
      if (cancelled) {
1✔
912
        return;
1✔
913
      }
914
      setData(checkNotNull(update, "update"));
1✔
915
      maybePublishConfig();
1✔
916
    }
1✔
917

918
    void addParentContext(CdsWatcher parentContext) {
919
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
920
    }
1✔
921
  }
922
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc