• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19836

30 May 2025 04:00PM UTC coverage: 88.599% (-0.005%) from 88.604%
#19836

push

github

web-flow
xds: Improve shutdown handling of XdsDepManager

The most important change here is to handle subscribeToCluster() calls
after shutdown(), and preventing the internal state from being heavily
confused as the assumption is there are no watchers after shutdown().

ClusterSubscription.closed isn't strictly necessary, but I don't want
the code to depend on double-deregistration being safe.
maybePublishConfig() isn't being called after shutdown(), but adding the
protection avoids a class of bugs that would cause channel panic.

34793 of 39270 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.52
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.Sets;
26
import io.grpc.InternalLogId;
27
import io.grpc.NameResolver;
28
import io.grpc.Status;
29
import io.grpc.StatusOr;
30
import io.grpc.SynchronizationContext;
31
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
32
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
33
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
34
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
35
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
36
import io.grpc.xds.client.XdsClient;
37
import io.grpc.xds.client.XdsClient.ResourceWatcher;
38
import io.grpc.xds.client.XdsLogger;
39
import io.grpc.xds.client.XdsResourceType;
40
import java.io.Closeable;
41
import java.io.IOException;
42
import java.util.Collections;
43
import java.util.HashMap;
44
import java.util.HashSet;
45
import java.util.List;
46
import java.util.Map;
47
import java.util.Objects;
48
import java.util.Set;
49
import java.util.concurrent.ScheduledExecutorService;
50
import java.util.stream.Collectors;
51
import javax.annotation.Nullable;
52

53
/**
54
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
55
 * maintains the watchers for the xds resources and when an update is received, it either requests
56
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
57
 * applies to a single data plane authority.
58
 */
59
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
60
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
61
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
62
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
63
  private final XdsClient xdsClient;
64
  private final XdsConfigWatcher xdsConfigWatcher;
65
  private final SynchronizationContext syncContext;
66
  private final String dataPlaneAuthority;
67

68
  private final InternalLogId logId;
69
  private final XdsLogger logger;
70
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
71
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
72

73
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
74
                       SynchronizationContext syncContext, String dataPlaneAuthority,
75
                       String listenerName, NameResolver.Args nameResolverArgs,
76
                       ScheduledExecutorService scheduler) {
1✔
77
    logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
1✔
78
    logger = XdsLogger.withLogId(logId);
1✔
79
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
80
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
81
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
82
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
83
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
84
    checkNotNull(scheduler, "scheduler");
1✔
85

86
    // start the ball rolling
87
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
88
  }
1✔
89

90
  public static String toContextStr(String typeName, String resourceName) {
91
    return typeName + " resource " + resourceName;
1✔
92
  }
93

94
  @Override
95
  public Closeable subscribeToCluster(String clusterName) {
96
    checkNotNull(clusterName, "clusterName");
1✔
97
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
98

99
    syncContext.execute(() -> {
1✔
100
      if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
101
        subscription.closed = true;
1✔
102
        return; // shutdown() called
1✔
103
      }
104
      addClusterWatcher(clusterName, subscription, 1);
1✔
105
    });
1✔
106

107
    return subscription;
1✔
108
  }
109

110
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
111
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
112
    XdsResourceType<T> type = watcher.type;
1✔
113
    String resourceName = watcher.resourceName;
1✔
114

115
    @SuppressWarnings("unchecked")
116
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
117
    if (typeWatchers == null) {
1✔
118
      typeWatchers = new TypeWatchers<>(type);
1✔
119
      resourceWatchers.put(type, typeWatchers);
1✔
120
    }
121

122
    typeWatchers.add(resourceName, watcher);
1✔
123
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
124
  }
1✔
125

126
  private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) {
127
    if (watcher == null) {
1✔
128
      return;
×
129
    }
130
    watcher.parentContexts.remove(parentContext);
1✔
131
    if (watcher.parentContexts.isEmpty()) {
1✔
132
      cancelWatcher(watcher);
1✔
133
    }
134
  }
1✔
135

136
  private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) {
137
    if (watcher == null) {
1✔
138
      return;
×
139
    }
140
    watcher.parentContexts.remove(parentContext);
1✔
141
    if (watcher.parentContexts.isEmpty()) {
1✔
142
      cancelWatcher(watcher);
1✔
143
    }
144
  }
1✔
145

146
  private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
147
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
148

149
    if (watcher == null) {
1✔
150
      return;
×
151
    }
152

153
    if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) {
1✔
154
      throwIfParentContextsNotEmpty(watcher);
1✔
155
    }
156

157
    watcher.cancelled = true;
1✔
158
    XdsResourceType<T> type = watcher.type;
1✔
159
    String resourceName = watcher.resourceName;
1✔
160

161
    @SuppressWarnings("unchecked")
162
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
163
    if (typeWatchers == null) {
1✔
164
      logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher);
×
165
      return;
×
166
    }
167

168
    typeWatchers.watchers.remove(resourceName);
1✔
169
    xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);
1✔
170

171
  }
1✔
172

173
  private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
174
    if (watcher instanceof CdsWatcher) {
1✔
175
      CdsWatcher cdsWatcher = (CdsWatcher) watcher;
1✔
176
      if (!cdsWatcher.parentContexts.isEmpty()) {
1✔
177
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
178
            cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet());
×
179
        throw new IllegalStateException(msg);
×
180
      }
181
    } else if (watcher instanceof EdsWatcher) {
1✔
182
      EdsWatcher edsWatcher = (EdsWatcher) watcher;
1✔
183
      if (!edsWatcher.parentContexts.isEmpty()) {
1✔
184
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
185
            edsWatcher.resourceName(), edsWatcher.parentContexts);
×
186
        throw new IllegalStateException(msg);
×
187
      }
188
    }
189
  }
1✔
190

191
  public void shutdown() {
192
    syncContext.execute(() -> {
1✔
193
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
194
        shutdownWatchersForType(watchers);
1✔
195
      }
1✔
196
      resourceWatchers.clear();
1✔
197
    });
1✔
198
  }
1✔
199

200
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
201
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
202
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
203
          watcherEntry.getValue());
1✔
204
      watcherEntry.getValue().cancelled = true;
1✔
205
    }
1✔
206
  }
1✔
207

208
  private void releaseSubscription(ClusterSubscription subscription) {
209
    checkNotNull(subscription, "subscription");
1✔
210
    String clusterName = subscription.getClusterName();
1✔
211
    syncContext.execute(() -> {
1✔
212
      if (subscription.closed) {
1✔
213
        return;
1✔
214
      }
215
      subscription.closed = true;
1✔
216
      XdsWatcherBase<?> cdsWatcher =
1✔
217
          resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
218
      if (cdsWatcher == null) {
1✔
219
        return; // shutdown() called
×
220
      }
221
      cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
1✔
222
      maybePublishConfig();
1✔
223
    });
1✔
224
  }
1✔
225

226
  private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) {
227
    checkNotNull(root, "root");
1✔
228

229
    cancelCdsWatcher(root, parentContext);
1✔
230

231
    if (!root.hasDataValue() || !root.parentContexts.isEmpty()) {
1✔
232
      return;
1✔
233
    }
234

235
    XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue();
1✔
236
    switch (cdsUpdate.clusterType()) {
1✔
237
      case EDS:
238
        String edsServiceName = root.getEdsServiceName();
1✔
239
        EdsWatcher edsWatcher =
1✔
240
            (EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName);
1✔
241
        cancelEdsWatcher(edsWatcher, root);
1✔
242
        break;
1✔
243
      case AGGREGATE:
244
        for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
245
          CdsWatcher clusterWatcher =
1✔
246
              (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(cluster);
1✔
247
          if (clusterWatcher != null) {
1✔
248
            cancelClusterWatcherTree(clusterWatcher, root);
1✔
249
          }
250
        }
1✔
251
        break;
1✔
252
      case LOGICAL_DNS:
253
        // no eds needed, so everything happens in cancelCdsWatcher()
254
        break;
×
255
      default:
256
        throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType());
×
257
    }
258
  }
1✔
259

260
  /**
261
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
262
   * the watchers.
263
   */
264
  private void maybePublishConfig() {
265
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
266
    if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
267
      return; // shutdown() called
×
268
    }
269
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
270
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
271
        .anyMatch(XdsWatcherBase::missingResult);
1✔
272
    if (waitingOnResource) {
1✔
273
      return;
1✔
274
    }
275

276
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
277
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
278
      return;
1✔
279
    }
280
    assert newUpdate.hasValue()
1✔
281
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
282
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
283
    lastUpdate = newUpdate;
1✔
284
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
285
  }
1✔
286

287
  @VisibleForTesting
288
  StatusOr<XdsConfig> buildUpdate() {
289
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
290

291
    // Iterate watchers and build the XdsConfig
292

293
    // Will only be 1 listener and 1 route resource
294
    RdsUpdateSupplier routeSource = null;
1✔
295
    for (XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher :
296
        getWatchers(XdsListenerResource.getInstance()).values()) {
1✔
297
      if (!ldsWatcher.getData().hasValue()) {
1✔
298
        return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
299
      }
300
      XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
301
      builder.setListener(ldsUpdate);
1✔
302
      routeSource = ((LdsWatcher) ldsWatcher).getRouteSource();
1✔
303
    }
1✔
304

305
    if (routeSource == null) {
1✔
306
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
307
          "Bug: No route source found for listener " + dataPlaneAuthority));
308
    }
309

310
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
311
    if (!statusOrRdsUpdate.hasValue()) {
1✔
312
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
313
    }
314
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
315
    builder.setRoute(rdsUpdate);
1✔
316

317
    VirtualHost activeVirtualHost =
1✔
318
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
319
    if (activeVirtualHost == null) {
1✔
320
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
321
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
322
    }
323
    builder.setVirtualHost(activeVirtualHost);
1✔
324

325
    Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers =
1✔
326
        getWatchers(ENDPOINT_RESOURCE);
1✔
327
    Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers =
1✔
328
        getWatchers(CLUSTER_RESOURCE);
1✔
329

330
    // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
331
    List<String> topLevelClusters =
1✔
332
        cdsWatchers.values().stream()
1✔
333
            .filter(XdsDependencyManager::isTopLevelCluster)
1✔
334
            .map(XdsWatcherBase<?>::resourceName)
1✔
335
            .distinct()
1✔
336
            .collect(Collectors.toList());
1✔
337

338
    // Flatten multi-level aggregates into lists of leaf clusters
339
    Set<String> leafNames =
1✔
340
        addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters);
1✔
341

342
    addLeavesToBuilder(builder, edsWatchers, leafNames);
1✔
343

344
    return StatusOr.fromValue(builder.build());
1✔
345
  }
346

347
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
348
      XdsResourceType<T> resourceType) {
349
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
350
    if (typeWatchers == null) {
1✔
351
      return Collections.emptyMap();
1✔
352
    }
353
    assert typeWatchers.resourceType == resourceType;
1✔
354
    @SuppressWarnings("unchecked")
355
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
356
    return tTypeWatchers.watchers;
1✔
357
  }
358

359
  private void addLeavesToBuilder(
360
      XdsConfig.XdsConfigBuilder builder,
361
      Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
362
      Set<String> leafNames) {
363
    for (String clusterName : leafNames) {
1✔
364
      CdsWatcher cdsWatcher = getCluster(clusterName);
1✔
365
      StatusOr<XdsClusterResource.CdsUpdate> cdsUpdateOr = cdsWatcher.getData();
1✔
366

367
      if (!cdsUpdateOr.hasValue()) {
1✔
368
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus()));
1✔
369
        continue;
1✔
370
      }
371

372
      XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue();
1✔
373
      if (cdsUpdate.clusterType() == ClusterType.EDS) {
1✔
374
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
375
            edsWatchers.get(cdsWatcher.getEdsServiceName());
1✔
376
        EndpointConfig child;
377
        if (edsWatcher != null) {
1✔
378
          child = new EndpointConfig(edsWatcher.getData());
1✔
379
        } else {
380
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
381
              "EDS resource not found for cluster " + clusterName)));
382
        }
383
        builder.addCluster(clusterName, StatusOr.fromValue(
1✔
384
            new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
385
      } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) {
1✔
386
        builder.addCluster(clusterName, StatusOr.fromStatus(
×
387
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
×
388
      }
389
    }
1✔
390
  }
1✔
391

392
  // Adds the top-level clusters to the builder and returns the leaf cluster names
393
  private Set<String> addTopLevelClustersToBuilder(
394
      XdsConfig.XdsConfigBuilder builder,
395
      Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
396
      Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers,
397
      List<String> topLevelClusters) {
398

399
    Set<String> leafClusterNames = new HashSet<>();
1✔
400
    for (String clusterName : topLevelClusters) {
1✔
401
      CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName);
1✔
402
      StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
403
      if (!cdsWatcher.hasDataValue()) {
1✔
404
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
405
        continue;
1✔
406
      }
407

408
      XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
409
      XdsConfig.XdsClusterConfig.ClusterChild child;
410
      switch (cdsUpdate.clusterType()) {
1✔
411
        case AGGREGATE:
412
          Set<String> leafNames = new HashSet<>();
1✔
413
          addLeafNames(leafNames, cdsUpdate);
1✔
414
          child = new AggregateConfig(leafNames);
1✔
415
          leafClusterNames.addAll(leafNames);
1✔
416
          break;
1✔
417
        case EDS:
418
          XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
419
              edsWatchers.get(cdsWatcher.getEdsServiceName());
1✔
420
          if (edsWatcher != null) {
1✔
421
            child = new EndpointConfig(edsWatcher.getData());
1✔
422
          } else {
423
            child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
424
                "EDS resource not found for cluster " + clusterName)));
425
          }
426
          break;
×
427
        case LOGICAL_DNS:
428
          // TODO get the resolved endpoint configuration
429
          child = new EndpointConfig(StatusOr.fromStatus(
1✔
430
              Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
431
          break;
1✔
432
        default:
433
          throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
434
      }
435
      builder.addCluster(clusterName, StatusOr.fromValue(
1✔
436
          new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
437
    }
1✔
438

439
    return leafClusterNames;
1✔
440
  }
441

442
  private void addLeafNames(Set<String> leafNames, XdsClusterResource.CdsUpdate cdsUpdate) {
443
    for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
444
      if (leafNames.contains(cluster)) {
1✔
445
        continue;
1✔
446
      }
447
      StatusOr<XdsClusterResource.CdsUpdate> data = getCluster(cluster).getData();
1✔
448
      if (data == null || !data.hasValue() || data.getValue() == null) {
1✔
449
        leafNames.add(cluster);
1✔
450
        continue;
1✔
451
      }
452
      if (data.getValue().clusterType() == ClusterType.AGGREGATE) {
1✔
453
        addLeafNames(leafNames, data.getValue());
1✔
454
      } else {
455
        leafNames.add(cluster);
1✔
456
      }
457
    }
1✔
458
  }
1✔
459

460
  private static boolean isTopLevelCluster(
461
      XdsWatcherBase<XdsClusterResource.CdsUpdate> cdsWatcher) {
462
    return ((CdsWatcher)cdsWatcher).parentContexts.values().stream()
1✔
463
        .anyMatch(depth -> depth == 1);
1✔
464
  }
465

466
  @Override
467
  public String toString() {
468
    return logId.toString();
×
469
  }
470

471
  // Returns true if the watcher was added, false if it already exists
472
  private boolean addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
473
    TypeWatchers<?> typeWatchers = resourceWatchers.get(XdsEndpointResource.getInstance());
1✔
474
    if (typeWatchers == null || !typeWatchers.watchers.containsKey(edsServiceName)) {
1✔
475
      addWatcher(new EdsWatcher(edsServiceName, parentContext));
1✔
476
      return true;
1✔
477
    }
478

479
    EdsWatcher watcher = (EdsWatcher) typeWatchers.watchers.get(edsServiceName);
1✔
480
    watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence
1✔
481
    return false;
1✔
482
  }
483

484
  private void addClusterWatcher(String clusterName, Object parentContext, int depth) {
485
    TypeWatchers<?> clusterWatchers = resourceWatchers.get(CLUSTER_RESOURCE);
1✔
486
    if (clusterWatchers != null) {
1✔
487
      CdsWatcher watcher = (CdsWatcher) clusterWatchers.watchers.get(clusterName);
1✔
488
      if (watcher != null) {
1✔
489
        watcher.parentContexts.put(parentContext, depth);
1✔
490
        return;
1✔
491
      }
492
    }
493

494
    addWatcher(new CdsWatcher(clusterName, parentContext, depth));
1✔
495
  }
1✔
496

497
  private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
498
                            List<VirtualHost> oldVirtualHosts, boolean sameParentContext) {
499
    VirtualHost oldVirtualHost =
1✔
500
        RoutingUtils.findVirtualHostForHostName(oldVirtualHosts, dataPlaneAuthority);
1✔
501
    VirtualHost virtualHost =
1✔
502
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
503

504
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
505
    Set<String> oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost);
1✔
506

507
    if (sameParentContext) {
1✔
508
      // Calculate diffs.
509
      Set<String> addedClusters = Sets.difference(newClusters, oldClusters);
1✔
510
      Set<String> deletedClusters = Sets.difference(oldClusters, newClusters);
1✔
511

512
      deletedClusters.forEach(watcher ->
1✔
513
          cancelClusterWatcherTree(getCluster(watcher), newParentContext));
1✔
514
      addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
515
    } else {
1✔
516
      newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
517
    }
518
  }
1✔
519

520
  private String nodeInfo() {
521
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
522
  }
523

524
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
525
    if (virtualHost == null) {
1✔
526
      return Collections.emptySet();
1✔
527
    }
528

529
    // Get all cluster names to which requests can be routed through the virtual host.
530
    Set<String> clusters = new HashSet<>();
1✔
531
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
532
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
533
      if (action == null) {
1✔
534
        continue;
1✔
535
      }
536
      if (action.cluster() != null) {
1✔
537
        clusters.add(action.cluster());
1✔
538
      } else if (action.weightedClusters() != null) {
1✔
539
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
540
          clusters.add(weighedCluster.name());
1✔
541
        }
1✔
542
      }
543
    }
1✔
544

545
    return clusters;
1✔
546
  }
547

548
  private CdsWatcher getCluster(String clusterName) {
549
    return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
550
  }
551

552
  private static class TypeWatchers<T extends ResourceUpdate> {
553
    // Key is resource name
554
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
555
    final XdsResourceType<T> resourceType;
556

557
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
558
      this.resourceType = resourceType;
1✔
559
    }
1✔
560

561
    public void add(String resourceName, XdsWatcherBase<T> watcher) {
562
      watchers.put(resourceName, watcher);
1✔
563
    }
1✔
564
  }
565

566
  public interface XdsConfigWatcher {
567
    /**
568
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
569
     * INTERNAL.
570
     */
571
    void onUpdate(StatusOr<XdsConfig> config);
572
  }
573

574
  private final class ClusterSubscription implements Closeable {
575
    private final String clusterName;
576
    boolean closed; // Accessed from syncContext
577

578
    public ClusterSubscription(String clusterName) {
1✔
579
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
580
    }
1✔
581

582
    String getClusterName() {
583
      return clusterName;
1✔
584
    }
585

586
    @Override
587
    public void close() throws IOException {
588
      releaseSubscription(this);
1✔
589
    }
1✔
590
  }
591

592
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
593
      implements ResourceWatcher<T> {
594
    private final XdsResourceType<T> type;
595
    private final String resourceName;
596
    boolean cancelled;
597

598
    @Nullable
599
    private StatusOr<T> data;
600

601

602
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
603
      this.type = checkNotNull(type, "type");
1✔
604
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
605
    }
1✔
606

607
    @Override
608
    public void onError(Status error) {
609
      checkNotNull(error, "error");
1✔
610
      if (cancelled) {
1✔
611
        return;
×
612
      }
613
      // Don't update configuration on error, if we've already received configuration
614
      if (!hasDataValue()) {
1✔
615
        setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
616
            String.format("Error retrieving %s: %s: %s",
1✔
617
              toContextString(), error.getCode(), error.getDescription())));
1✔
618
        maybePublishConfig();
1✔
619
      }
620
    }
1✔
621

622
    @Override
623
    public void onResourceDoesNotExist(String resourceName) {
624
      if (cancelled) {
1✔
625
        return;
1✔
626
      }
627

628
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
629
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
630
          toContextString() + " does not exist" + nodeInfo()));
1✔
631
      maybePublishConfig();
1✔
632
    }
1✔
633

634
    boolean missingResult() {
635
      return data == null;
1✔
636
    }
637

638
    @Nullable
639
    StatusOr<T> getData() {
640
      return data;
1✔
641
    }
642

643
    boolean hasDataValue() {
644
      return data != null && data.hasValue();
1✔
645
    }
646

647
    String resourceName() {
648
      return resourceName;
1✔
649
    }
650

651
    protected void setData(T data) {
652
      checkNotNull(data, "data");
1✔
653
      this.data = StatusOr.fromValue(data);
1✔
654
    }
1✔
655

656
    protected void setDataAsStatus(Status status) {
657
      checkNotNull(status, "status");
1✔
658
      this.data = StatusOr.fromStatus(status);
1✔
659
    }
1✔
660

661
    public String toContextString() {
662
      return toContextStr(type.typeName(), resourceName);
1✔
663
    }
664
  }
665

666
  private interface RdsUpdateSupplier {
667
    StatusOr<RdsUpdate> getRdsUpdate();
668
  }
669

670
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
671
      implements RdsUpdateSupplier {
672
    String rdsName;
673

674
    private LdsWatcher(String resourceName) {
1✔
675
      super(XdsListenerResource.getInstance(), resourceName);
1✔
676
    }
1✔
677

678
    @Override
679
    public void onChanged(XdsListenerResource.LdsUpdate update) {
680
      checkNotNull(update, "update");
1✔
681
      if (cancelled) {
1✔
682
        return;
1✔
683
      }
684

685
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
686
      List<VirtualHost> virtualHosts;
687
      String rdsName;
688
      if (httpConnectionManager == null) {
1✔
689
        // TCP listener. Unsupported config
690
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
691
        rdsName = null;
1✔
692
      } else {
693
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
694
        rdsName = httpConnectionManager.rdsName();
1✔
695
      }
696
      StatusOr<RdsUpdate> activeRdsUpdate = getRouteSource().getRdsUpdate();
1✔
697
      List<VirtualHost> activeVirtualHosts = activeRdsUpdate.hasValue()
1✔
698
          ? activeRdsUpdate.getValue().virtualHosts
1✔
699
          : Collections.emptyList();
1✔
700

701
      boolean changedRdsName = !Objects.equals(rdsName, this.rdsName);
1✔
702
      if (changedRdsName) {
1✔
703
        cleanUpRdsWatcher();
1✔
704
      }
705

706
      if (virtualHosts != null) {
1✔
707
        // No RDS watcher since we are getting RDS updates via LDS
708
        updateRoutes(virtualHosts, this, activeVirtualHosts, this.rdsName == null);
1✔
709
        this.rdsName = null;
1✔
710
      } else if (changedRdsName) {
1✔
711
        this.rdsName = rdsName;
1✔
712
        addWatcher(new RdsWatcher(rdsName));
1✔
713
        logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
714
      }
715

716
      setData(update);
1✔
717
      maybePublishConfig();
1✔
718
    }
1✔
719

720
    @Override
721
    public void onResourceDoesNotExist(String resourceName) {
722
      if (cancelled) {
1✔
723
        return;
×
724
      }
725

726
      checkArgument(resourceName().equals(resourceName), "Resource name does not match");
1✔
727
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
728
          toContextString() + " does not exist" + nodeInfo()));
1✔
729
      cleanUpRdsWatcher();
1✔
730
      rdsName = null;
1✔
731
      maybePublishConfig();
1✔
732
    }
1✔
733

734
    private void cleanUpRdsWatcher() {
735
      RdsWatcher oldRdsWatcher = getRdsWatcher();
1✔
736
      if (oldRdsWatcher != null) {
1✔
737
        cancelWatcher(oldRdsWatcher);
1✔
738
        logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName);
1✔
739

740
        // Cleanup clusters (as appropriate) that had the old rds watcher as a parent
741
        if (!oldRdsWatcher.hasDataValue() || resourceWatchers.get(CLUSTER_RESOURCE) == null) {
1✔
742
          return;
×
743
        }
744
        for (XdsWatcherBase<?> watcher :
745
            resourceWatchers.get(CLUSTER_RESOURCE).watchers.values()) {
1✔
746
          cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher);
1✔
747
        }
1✔
748
      }
749
    }
1✔
750

751
    private RdsWatcher getRdsWatcher() {
752
      if (rdsName == null) {
1✔
753
        return null;
1✔
754
      }
755
      TypeWatchers<?> watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
1✔
756
      if (watchers == null) {
1✔
757
        return null;
×
758
      }
759
      return (RdsWatcher) watchers.watchers.get(rdsName);
1✔
760
    }
761

762
    public RdsUpdateSupplier getRouteSource() {
763
      if (!hasDataValue()) {
1✔
764
        return this;
1✔
765
      }
766
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
767
      if (hcm == null) {
1✔
768
        return this;
1✔
769
      }
770
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
771
      if (virtualHosts != null) {
1✔
772
        return this;
1✔
773
      }
774
      RdsWatcher rdsWatcher = getRdsWatcher();
1✔
775
      assert rdsWatcher != null;
1✔
776
      return rdsWatcher;
1✔
777
    }
778

779
    @Override
780
    public StatusOr<RdsUpdate> getRdsUpdate() {
781
      if (missingResult()) {
1✔
782
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
1✔
783
      }
784
      if (!getData().hasValue()) {
1✔
785
        return StatusOr.fromStatus(getData().getStatus());
1✔
786
      }
787
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
788
      if (hcm == null) {
1✔
789
        return StatusOr.fromStatus(
1✔
790
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
791
      }
792
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
793
      if (virtualHosts == null) {
1✔
794
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
795
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
796
        // bug
797
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
798
      }
799
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
800
    }
801
  }
802

803
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
804

805
    public RdsWatcher(String resourceName) {
1✔
806
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
807
    }
1✔
808

809
    @Override
810
    public void onChanged(RdsUpdate update) {
811
      checkNotNull(update, "update");
1✔
812
      if (cancelled) {
1✔
813
        return;
1✔
814
      }
815
      List<VirtualHost> oldVirtualHosts = hasDataValue()
1✔
816
          ? getData().getValue().virtualHosts
1✔
817
          : Collections.emptyList();
1✔
818
      setData(update);
1✔
819
      updateRoutes(update.virtualHosts, this, oldVirtualHosts, true);
1✔
820
      maybePublishConfig();
1✔
821
    }
1✔
822

823
    @Override
824
    public StatusOr<RdsUpdate> getRdsUpdate() {
825
      if (missingResult()) {
1✔
826
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
827
      }
828
      return getData();
1✔
829
    }
830
  }
831

832
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
833
    Map<Object, Integer> parentContexts = new HashMap<>();
1✔
834

835
    CdsWatcher(String resourceName, Object parentContext, int depth) {
1✔
836
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
837
      this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth);
1✔
838
    }
1✔
839

840
    @Override
841
    public void onChanged(XdsClusterResource.CdsUpdate update) {
842
      checkNotNull(update, "update");
1✔
843
      if (cancelled) {
1✔
844
        return;
1✔
845
      }
846
      switch (update.clusterType()) {
1✔
847
        case EDS:
848
          setData(update);
1✔
849
          if (!addEdsWatcher(getEdsServiceName(), this))  {
1✔
850
            maybePublishConfig();
1✔
851
          }
852
          break;
853
        case LOGICAL_DNS:
854
          setData(update);
1✔
855
          maybePublishConfig();
1✔
856
          // no eds needed
857
          break;
1✔
858
        case AGGREGATE:
859
          Object parentContext = this;
1✔
860
          int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1;
1✔
861
          if (depth > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
862
            logger.log(XdsLogger.XdsLogLevel.WARNING,
×
863
                "Cluster recursion depth limit exceeded for cluster {0}", resourceName());
×
864
            Status error = Status.UNAVAILABLE.withDescription(
×
865
                "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
×
866
            setDataAsStatus(error);
×
867
          }
868
          if (hasDataValue()) {
1✔
869
            Set<String> oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE
1✔
870
                ? new HashSet<>(getData().getValue().prioritizedClusterNames())
1✔
871
                : Collections.emptySet();
1✔
872
            Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
1✔
873

874
            Set<String> deletedClusters = Sets.difference(oldNames, newNames);
1✔
875
            deletedClusters.forEach((cluster)
1✔
876
                -> cancelClusterWatcherTree(getCluster(cluster), parentContext));
1✔
877

878
            if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
879
              setData(update);
1✔
880
              Set<String> addedClusters = Sets.difference(newNames, oldNames);
1✔
881
              addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth));
1✔
882

883
              if (addedClusters.isEmpty()) {
1✔
884
                maybePublishConfig();
×
885
              }
886
            } else { // data was set to error status above
1✔
887
              maybePublishConfig();
×
888
            }
889

890
          } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
891
            setData(update);
1✔
892
            update.prioritizedClusterNames()
1✔
893
                .forEach(name -> addClusterWatcher(name, parentContext, depth));
1✔
894
            maybePublishConfig();
1✔
895
          }
896
          break;
897
        default:
898
          Status error = Status.UNAVAILABLE.withDescription(
×
899
              "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
×
900
          setDataAsStatus(error);
×
901
          maybePublishConfig();
×
902
      }
903
    }
1✔
904

905
    public String getEdsServiceName() {
906
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
907
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
908
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
909
      if (edsServiceName == null) {
1✔
910
        edsServiceName = cdsUpdate.clusterName();
×
911
      }
912
      return edsServiceName;
1✔
913
    }
914
  }
915

916
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
917
    private final Set<CdsWatcher> parentContexts = new HashSet<>();
1✔
918

919
    private EdsWatcher(String resourceName, CdsWatcher parentContext) {
1✔
920
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
921
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
922
    }
1✔
923

924
    @Override
925
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
926
      if (cancelled) {
1✔
927
        return;
1✔
928
      }
929
      setData(checkNotNull(update, "update"));
1✔
930
      maybePublishConfig();
1✔
931
    }
1✔
932

933
    void addParentContext(CdsWatcher parentContext) {
934
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
935
    }
1✔
936
  }
937
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc