• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19862

13 Jun 2025 03:18PM UTC coverage: 88.578% (+0.006%) from 88.572%
#19862

push

github

ejona86
xds: Avoid changing cache when watching children in XdsDepManager

The watchers can be completely regular, so the base class can do the
cache management while the subclasses are only concerned with
subscribing to children.

34564 of 39021 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.26
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import io.grpc.NameResolver;
27
import io.grpc.Status;
28
import io.grpc.StatusOr;
29
import io.grpc.SynchronizationContext;
30
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
31
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
32
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
33
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
34
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
35
import io.grpc.xds.client.XdsClient;
36
import io.grpc.xds.client.XdsClient.ResourceWatcher;
37
import io.grpc.xds.client.XdsResourceType;
38
import java.util.Collections;
39
import java.util.HashMap;
40
import java.util.HashSet;
41
import java.util.LinkedHashSet;
42
import java.util.List;
43
import java.util.Map;
44
import java.util.Objects;
45
import java.util.Set;
46
import java.util.concurrent.ScheduledExecutorService;
47
import javax.annotation.Nullable;
48

49
/**
50
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
51
 * maintains the watchers for the xds resources and when an update is received, it either requests
52
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
53
 * applies to a single data plane authority.
54
 */
55
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
56
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
57
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
58
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
59
  private final String listenerName;
60
  private final XdsClient xdsClient;
61
  private final SynchronizationContext syncContext;
62
  private final String dataPlaneAuthority;
63
  private XdsConfigWatcher xdsConfigWatcher;
64

65
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
66
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
67
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
68

69
  XdsDependencyManager(XdsClient xdsClient,
70
                       SynchronizationContext syncContext, String dataPlaneAuthority,
71
                       String listenerName, NameResolver.Args nameResolverArgs,
72
                       ScheduledExecutorService scheduler) {
1✔
73
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
74
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
75
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
76
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
77
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
78
    checkNotNull(scheduler, "scheduler");
1✔
79
  }
1✔
80

81
  public static String toContextStr(String typeName, String resourceName) {
82
    return typeName + " resource " + resourceName;
1✔
83
  }
84

85
  public void start(XdsConfigWatcher xdsConfigWatcher) {
86
    checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
1✔
87
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
88
    // start the ball rolling
89
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
90
  }
1✔
91

92
  @Override
93
  public XdsConfig.Subscription subscribeToCluster(String clusterName) {
94
    checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
1✔
95
    checkNotNull(clusterName, "clusterName");
1✔
96
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
97

98
    syncContext.execute(() -> {
1✔
99
      if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
100
        subscription.closed = true;
1✔
101
        return; // shutdown() called
1✔
102
      }
103
      subscriptions.add(subscription);
1✔
104
      addClusterWatcher(clusterName);
1✔
105
    });
1✔
106

107
    return subscription;
1✔
108
  }
109

110
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
111
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
112
    XdsResourceType<T> type = watcher.type;
1✔
113
    String resourceName = watcher.resourceName;
1✔
114

115
    getWatchers(type).put(resourceName, watcher);
1✔
116
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
117
  }
1✔
118

119
  public void shutdown() {
120
    syncContext.execute(() -> {
1✔
121
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
122
        shutdownWatchersForType(watchers);
1✔
123
      }
1✔
124
      resourceWatchers.clear();
1✔
125
      subscriptions.clear();
1✔
126
    });
1✔
127
  }
1✔
128

129
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
130
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
131
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
132
          watcherEntry.getValue());
1✔
133
      watcherEntry.getValue().cancelled = true;
1✔
134
    }
1✔
135
  }
1✔
136

137
  private void releaseSubscription(ClusterSubscription subscription) {
138
    checkNotNull(subscription, "subscription");
1✔
139
    syncContext.execute(() -> {
1✔
140
      if (subscription.closed) {
1✔
141
        return;
1✔
142
      }
143
      subscription.closed = true;
1✔
144
      if (!subscriptions.remove(subscription)) {
1✔
145
        return; // shutdown() called
×
146
      }
147
      maybePublishConfig();
1✔
148
    });
1✔
149
  }
1✔
150

151
  /**
152
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
153
   * the watchers.
154
   */
155
  private void maybePublishConfig() {
156
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
157
    if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
158
      return; // shutdown() called
×
159
    }
160
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
161
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
162
        .anyMatch(XdsWatcherBase::missingResult);
1✔
163
    if (waitingOnResource) {
1✔
164
      return;
1✔
165
    }
166

167
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
168
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
169
      return;
1✔
170
    }
171
    assert newUpdate.hasValue()
1✔
172
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
173
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
174
    lastUpdate = newUpdate;
1✔
175
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
176
  }
1✔
177

178
  @VisibleForTesting
179
  StatusOr<XdsConfig> buildUpdate() {
180
    // Create a config and discard any watchers not accessed
181
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
182
    StatusOr<XdsConfig> config = buildUpdate(
1✔
183
        tracer, listenerName, dataPlaneAuthority, subscriptions);
184
    tracer.closeUnusedWatchers();
1✔
185
    return config;
1✔
186
  }
187

188
  private static StatusOr<XdsConfig> buildUpdate(
189
      WatcherTracer tracer,
190
      String listenerName,
191
      String dataPlaneAuthority,
192
      Set<ClusterSubscription> subscriptions) {
193
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
194

195
    // Iterate watchers and build the XdsConfig
196

197
    XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
198
        = tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
1✔
199
    if (ldsWatcher == null) {
1✔
200
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
201
          "Bug: No listener watcher found for " + listenerName));
202
    }
203
    if (!ldsWatcher.getData().hasValue()) {
1✔
204
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
205
    }
206
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
207
    builder.setListener(ldsUpdate);
1✔
208

209
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
210
    if (routeSource == null) {
1✔
211
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
212
          "Bug: No route source found for listener " + dataPlaneAuthority));
213
    }
214
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
215
    if (!statusOrRdsUpdate.hasValue()) {
1✔
216
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
217
    }
218
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
219
    builder.setRoute(rdsUpdate);
1✔
220

221
    VirtualHost activeVirtualHost =
1✔
222
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
223
    if (activeVirtualHost == null) {
1✔
224
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
225
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
226
    }
227
    builder.setVirtualHost(activeVirtualHost);
1✔
228

229
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
230
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
231
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
232
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
233
    }
1✔
234
    for (ClusterSubscription subscription : subscriptions) {
1✔
235
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
236
    }
1✔
237
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
238
      builder.addCluster(me.getKey(), me.getValue());
1✔
239
    }
1✔
240

241
    return StatusOr.fromValue(builder.build());
1✔
242
  }
243

244
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
245
      XdsResourceType<T> resourceType) {
246
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
247
    if (typeWatchers == null) {
1✔
248
      typeWatchers = new TypeWatchers<T>(resourceType);
1✔
249
      resourceWatchers.put(resourceType, typeWatchers);
1✔
250
    }
251
    assert typeWatchers.resourceType == resourceType;
1✔
252
    @SuppressWarnings("unchecked")
253
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
254
    return tTypeWatchers.watchers;
1✔
255
  }
256

257
  private static void addConfigForCluster(
258
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
259
      String clusterName,
260
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
261
      LinkedHashSet<String> ancestors,
262
      WatcherTracer tracer) {
263
    if (clusters.containsKey(clusterName)) {
1✔
264
      return;
1✔
265
    }
266
    if (ancestors.contains(clusterName)) {
1✔
267
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
268
          Status.INTERNAL.withDescription(
1✔
269
              "Aggregate cluster cycle detected: " + ancestors)));
270
      return;
1✔
271
    }
272
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
273
      clusters.put(clusterName, StatusOr.fromStatus(
×
274
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
275
      return;
×
276
    }
277

278
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
1✔
279
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
280
    if (!cdsWatcherDataOr.hasValue()) {
1✔
281
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
282
      return;
1✔
283
    }
284

285
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
286
    XdsConfig.XdsClusterConfig.ClusterChild child;
287
    switch (cdsUpdate.clusterType()) {
1✔
288
      case AGGREGATE:
289
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
290
        // preserves the priority across all aggregate clusters
291
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
292
        ancestors.add(clusterName);
1✔
293
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
294
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
295
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
296
          if (!config.hasValue()) {
1✔
297
            // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
298
            // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
299
            // watchers reports a transient ADS stream error, the policy should report that it is in
300
            // TRANSIENT_FAILURE if it has never passed a config to its child.
301
            //
302
            // But there's currently disagreement about whether that is actually what we want, and
303
            // that was not originally implemented in gRPC Java. So we're keeping Java's old
304
            // behavior for now and only failing the "leaves" (which is a bit arbitrary for a
305
            // cycle).
306
            leafNames.add(childCluster);
1✔
307
            continue;
1✔
308
          }
309
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
310
          if (children instanceof AggregateConfig) {
1✔
311
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
312
          } else {
313
            leafNames.add(childCluster);
1✔
314
          }
315
        }
1✔
316
        ancestors.remove(clusterName);
1✔
317

318
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
319
        break;
1✔
320
      case EDS:
321
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
322
            tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
1✔
323
        if (edsWatcher != null) {
1✔
324
          child = new EndpointConfig(edsWatcher.getData());
1✔
325
        } else {
326
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
327
              "EDS resource not found for cluster " + clusterName)));
328
        }
329
        break;
×
330
      case LOGICAL_DNS:
331
        // TODO get the resolved endpoint configuration
332
        child = new EndpointConfig(StatusOr.fromStatus(
1✔
333
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
334
        break;
1✔
335
      default:
336
        child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
337
              "Unknown type in cluster " + clusterName + " " + cdsUpdate.clusterType())));
×
338
    }
339
    if (clusters.containsKey(clusterName)) {
1✔
340
      // If a cycle is detected, we'll have detected it while recursing, so now there will be a key
341
      // present. We don't want to overwrite it with a non-error value.
342
      return;
1✔
343
    }
344
    clusters.put(clusterName, StatusOr.fromValue(
1✔
345
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
346
  }
1✔
347

348
  private void addRdsWatcher(String resourceName) {
349
    if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
1✔
350
      return;
×
351
    }
352

353
    addWatcher(new RdsWatcher(resourceName));
1✔
354
  }
1✔
355

356
  private void addEdsWatcher(String edsServiceName) {
357
    if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
1✔
358
      return;
1✔
359
    }
360

361
    addWatcher(new EdsWatcher(edsServiceName));
1✔
362
  }
1✔
363

364
  private void addClusterWatcher(String clusterName) {
365
    if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
1✔
366
      return;
1✔
367
    }
368

369
    addWatcher(new CdsWatcher(clusterName));
1✔
370
  }
1✔
371

372
  private void updateRoutes(List<VirtualHost> virtualHosts) {
373
    VirtualHost virtualHost =
1✔
374
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
375
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
376
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
377
  }
1✔
378

379
  private String nodeInfo() {
380
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
381
  }
382

383
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
384
    if (virtualHost == null) {
1✔
385
      return Collections.emptySet();
1✔
386
    }
387

388
    // Get all cluster names to which requests can be routed through the virtual host.
389
    Set<String> clusters = new HashSet<>();
1✔
390
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
391
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
392
      if (action == null) {
1✔
393
        continue;
1✔
394
      }
395
      if (action.cluster() != null) {
1✔
396
        clusters.add(action.cluster());
1✔
397
      } else if (action.weightedClusters() != null) {
1✔
398
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
399
          clusters.add(weighedCluster.name());
1✔
400
        }
1✔
401
      }
402
    }
1✔
403

404
    return clusters;
1✔
405
  }
406

407
  private static class TypeWatchers<T extends ResourceUpdate> {
408
    // Key is resource name
409
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
410
    final XdsResourceType<T> resourceType;
411

412
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
413
      this.resourceType = resourceType;
1✔
414
    }
1✔
415
  }
416

417
  public interface XdsConfigWatcher {
418
    /**
419
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
420
     * INTERNAL.
421
     */
422
    void onUpdate(StatusOr<XdsConfig> config);
423
  }
424

425
  private final class ClusterSubscription implements XdsConfig.Subscription {
426
    private final String clusterName;
427
    boolean closed; // Accessed from syncContext
428

429
    public ClusterSubscription(String clusterName) {
1✔
430
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
431
    }
1✔
432

433
    String getClusterName() {
434
      return clusterName;
1✔
435
    }
436

437
    @Override
438
    public void close() {
439
      releaseSubscription(this);
1✔
440
    }
1✔
441
  }
442

443
  /** State for tracing garbage collector. */
444
  private static final class WatcherTracer {
1✔
445
    private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
446
    private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
447

448
    public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
1✔
449
      this.resourceWatchers = resourceWatchers;
1✔
450

451
      this.usedWatchers = new HashMap<>();
1✔
452
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
453
        usedWatchers.put(type, newTypeWatchers(type));
1✔
454
      }
1✔
455
    }
1✔
456

457
    private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
458
        XdsResourceType<T> type) {
459
      return new TypeWatchers<T>(type);
1✔
460
    }
461

462
    public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
463
        XdsResourceType<T> resourceType, String name) {
464
      TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
465
      if (typeWatchers == null) {
1✔
466
        return null;
×
467
      }
468
      assert typeWatchers.resourceType == resourceType;
1✔
469
      @SuppressWarnings("unchecked")
470
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
471
      XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
1✔
472
      if (watcher == null) {
1✔
473
        return null;
×
474
      }
475
      @SuppressWarnings("unchecked")
476
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
1✔
477
      usedTypeWatchers.watchers.put(name, watcher);
1✔
478
      return watcher;
1✔
479
    }
480

481
    /** Shut down unused watchers. */
482
    public void closeUnusedWatchers() {
483
      boolean changed = false; // Help out the GC by preferring old objects
1✔
484
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
485
        TypeWatchers<?> orig = resourceWatchers.get(type);
1✔
486
        TypeWatchers<?> used = usedWatchers.get(type);
1✔
487
        for (String name : orig.watchers.keySet()) {
1✔
488
          if (used.watchers.containsKey(name)) {
1✔
489
            continue;
1✔
490
          }
491
          orig.watchers.get(name).close();
1✔
492
          changed = true;
1✔
493
        }
1✔
494
      }
1✔
495
      if (changed) {
1✔
496
        resourceWatchers.putAll(usedWatchers);
1✔
497
      }
498
    }
1✔
499
  }
500

501
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
502
      implements ResourceWatcher<T> {
503
    private final XdsResourceType<T> type;
504
    private final String resourceName;
505
    boolean cancelled;
506

507
    @Nullable
508
    private StatusOr<T> data;
509

510

511
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
512
      this.type = checkNotNull(type, "type");
1✔
513
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
514
    }
1✔
515

516
    @Override
517
    public void onError(Status error) {
518
      checkNotNull(error, "error");
1✔
519
      if (cancelled) {
1✔
520
        return;
×
521
      }
522
      // Don't update configuration on error, if we've already received configuration
523
      if (!hasDataValue()) {
1✔
524
        this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
525
            String.format("Error retrieving %s: %s: %s",
1✔
526
              toContextString(), error.getCode(), error.getDescription())));
1✔
527
        maybePublishConfig();
1✔
528
      }
529
    }
1✔
530

531
    @Override
532
    public void onResourceDoesNotExist(String resourceName) {
533
      if (cancelled) {
1✔
534
        return;
×
535
      }
536

537
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
538
      this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
539
          toContextString() + " does not exist" + nodeInfo()));
1✔
540
      maybePublishConfig();
1✔
541
    }
1✔
542

543
    @Override
544
    public void onChanged(T update) {
545
      checkNotNull(update, "update");
1✔
546
      if (cancelled) {
1✔
547
        return;
1✔
548
      }
549

550
      this.data = StatusOr.fromValue(update);
1✔
551
      subscribeToChildren(update);
1✔
552
      maybePublishConfig();
1✔
553
    }
1✔
554

555
    protected abstract void subscribeToChildren(T update);
556

557
    public void close() {
558
      cancelled = true;
1✔
559
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
560
    }
1✔
561

562
    boolean missingResult() {
563
      return data == null;
1✔
564
    }
565

566
    @Nullable
567
    StatusOr<T> getData() {
568
      return data;
1✔
569
    }
570

571
    boolean hasDataValue() {
572
      return data != null && data.hasValue();
1✔
573
    }
574

575
    public String toContextString() {
576
      return toContextStr(type.typeName(), resourceName);
1✔
577
    }
578
  }
579

580
  private interface RdsUpdateSupplier {
581
    StatusOr<RdsUpdate> getRdsUpdate();
582
  }
583

584
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
585
      implements RdsUpdateSupplier {
586

587
    private LdsWatcher(String resourceName) {
1✔
588
      super(XdsListenerResource.getInstance(), resourceName);
1✔
589
    }
1✔
590

591
    @Override
592
    public void subscribeToChildren(XdsListenerResource.LdsUpdate update) {
593
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
594
      List<VirtualHost> virtualHosts;
595
      if (httpConnectionManager == null) {
1✔
596
        // TCP listener. Unsupported config
597
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
598
      } else {
599
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
600
      }
601
      if (virtualHosts != null) {
1✔
602
        updateRoutes(virtualHosts);
1✔
603
      }
604

605
      String rdsName = getRdsName(update);
1✔
606
      if (rdsName != null) {
1✔
607
        addRdsWatcher(rdsName);
1✔
608
      }
609
    }
1✔
610

611
    private String getRdsName(XdsListenerResource.LdsUpdate update) {
612
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
613
      if (httpConnectionManager == null) {
1✔
614
        // TCP listener. Unsupported config
615
        return null;
1✔
616
      }
617
      return httpConnectionManager.rdsName();
1✔
618
    }
619

620
    private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTracer tracer) {
621
      String rdsName = getRdsName(update);
1✔
622
      if (rdsName == null) {
1✔
623
        return null;
×
624
      }
625
      return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
1✔
626
    }
627

628
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
629
      if (!hasDataValue()) {
1✔
630
        return this;
×
631
      }
632
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
633
      if (hcm == null) {
1✔
634
        return this;
1✔
635
      }
636
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
637
      if (virtualHosts != null) {
1✔
638
        return this;
1✔
639
      }
640
      RdsWatcher rdsWatcher = getRdsWatcher(getData().getValue(), tracer);
1✔
641
      assert rdsWatcher != null;
1✔
642
      return rdsWatcher;
1✔
643
    }
644

645
    @Override
646
    public StatusOr<RdsUpdate> getRdsUpdate() {
647
      if (missingResult()) {
1✔
648
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
649
      }
650
      if (!getData().hasValue()) {
1✔
651
        return StatusOr.fromStatus(getData().getStatus());
×
652
      }
653
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
654
      if (hcm == null) {
1✔
655
        return StatusOr.fromStatus(
1✔
656
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
657
      }
658
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
659
      if (virtualHosts == null) {
1✔
660
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
661
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
662
        // bug
663
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
664
      }
665
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
666
    }
667
  }
668

669
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
670

671
    public RdsWatcher(String resourceName) {
1✔
672
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
673
    }
1✔
674

675
    @Override
676
    public void subscribeToChildren(RdsUpdate update) {
677
      updateRoutes(update.virtualHosts);
1✔
678
    }
1✔
679

680
    @Override
681
    public StatusOr<RdsUpdate> getRdsUpdate() {
682
      if (missingResult()) {
1✔
683
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
684
      }
685
      return getData();
1✔
686
    }
687
  }
688

689
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
690
    CdsWatcher(String resourceName) {
1✔
691
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
692
    }
1✔
693

694
    @Override
695
    public void subscribeToChildren(XdsClusterResource.CdsUpdate update) {
696
      switch (update.clusterType()) {
1✔
697
        case EDS:
698
          addEdsWatcher(getEdsServiceName());
1✔
699
          break;
1✔
700
        case LOGICAL_DNS:
701
          // no eds needed
702
          break;
1✔
703
        case AGGREGATE:
704
          update.prioritizedClusterNames()
1✔
705
              .forEach(name -> addClusterWatcher(name));
1✔
706
          break;
1✔
707
        default:
708
      }
709
    }
1✔
710

711
    public String getEdsServiceName() {
712
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
713
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
714
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
715
      if (edsServiceName == null) {
1✔
716
        edsServiceName = cdsUpdate.clusterName();
×
717
      }
718
      return edsServiceName;
1✔
719
    }
720
  }
721

722
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
723
    private EdsWatcher(String resourceName) {
1✔
724
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
725
    }
1✔
726

727
    @Override
728
    public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
1✔
729
  }
730
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc