• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19866

16 Jun 2025 02:32PM UTC coverage: 88.566% (-0.01%) from 88.578%
#19866

push

github

ejona86
xds: Support tracking non-xds resources in XdsDepManager

This will be used for logical dns clusters as part of gRFC A74. Swapping
to EnumMap wasn't really necessary, but was easy given the new type
system.

I can't say I'm particularly happy with the name of the new
TrackedWatcher type, but XdsConfigWatcher prevented using "Watcher"
because it won't implement the new interface, and ResourceWatcher
already exists in XdsClient. So we have TrackedWatcher, WatcherTracer,
TypeWatchers, and TrackedWatcherType.

34547 of 39007 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.33
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import io.grpc.NameResolver;
27
import io.grpc.Status;
28
import io.grpc.StatusOr;
29
import io.grpc.SynchronizationContext;
30
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
31
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
32
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
33
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
34
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
35
import io.grpc.xds.client.XdsClient;
36
import io.grpc.xds.client.XdsClient.ResourceWatcher;
37
import io.grpc.xds.client.XdsResourceType;
38
import java.util.Collections;
39
import java.util.EnumMap;
40
import java.util.HashMap;
41
import java.util.HashSet;
42
import java.util.LinkedHashSet;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.Objects;
46
import java.util.Set;
47
import java.util.concurrent.ScheduledExecutorService;
48
import javax.annotation.Nullable;
49

50
/**
51
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
52
 * maintains the watchers for the xds resources and when an update is received, it either requests
53
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
54
 * applies to a single data plane authority.
55
 */
56
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
57
  private enum TrackedWatcherTypeEnum {
1✔
58
    LDS, RDS, CDS, EDS
1✔
59
  }
60

61
  private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
1✔
62
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.LDS);
63
  private static final TrackedWatcherType<RdsUpdate> RDS_TYPE =
1✔
64
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.RDS);
65
  private static final TrackedWatcherType<XdsClusterResource.CdsUpdate> CDS_TYPE =
1✔
66
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
67
  private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
1✔
68
      new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
69

70
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
71
  private final String listenerName;
72
  private final XdsClient xdsClient;
73
  private final SynchronizationContext syncContext;
74
  private final String dataPlaneAuthority;
75
  private XdsConfigWatcher xdsConfigWatcher;
76

77
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
78
  private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers =
1✔
79
      new EnumMap<>(TrackedWatcherTypeEnum.class);
80
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
81

82
  XdsDependencyManager(XdsClient xdsClient,
83
                       SynchronizationContext syncContext, String dataPlaneAuthority,
84
                       String listenerName, NameResolver.Args nameResolverArgs,
85
                       ScheduledExecutorService scheduler) {
1✔
86
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
87
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
88
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
89
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
90
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
91
    checkNotNull(scheduler, "scheduler");
1✔
92
  }
1✔
93

94
  public static String toContextStr(String typeName, String resourceName) {
95
    return typeName + " resource " + resourceName;
1✔
96
  }
97

98
  public void start(XdsConfigWatcher xdsConfigWatcher) {
99
    checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
1✔
100
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
101
    // start the ball rolling
102
    syncContext.execute(() -> addWatcher(LDS_TYPE, new LdsWatcher(listenerName)));
1✔
103
  }
1✔
104

105
  @Override
106
  public XdsConfig.Subscription subscribeToCluster(String clusterName) {
107
    checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
1✔
108
    checkNotNull(clusterName, "clusterName");
1✔
109
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
110

111
    syncContext.execute(() -> {
1✔
112
      if (getWatchers(LDS_TYPE).isEmpty()) {
1✔
113
        subscription.closed = true;
1✔
114
        return; // shutdown() called
1✔
115
      }
116
      subscriptions.add(subscription);
1✔
117
      addClusterWatcher(clusterName);
1✔
118
    });
1✔
119

120
    return subscription;
1✔
121
  }
122

123
  private <T extends ResourceUpdate> void addWatcher(
124
      TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
125
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
126
    XdsResourceType<T> type = watcher.type;
1✔
127
    String resourceName = watcher.resourceName;
1✔
128

129
    getWatchers(watcherType).put(resourceName, watcher);
1✔
130
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
131
  }
1✔
132

133
  public void shutdown() {
134
    syncContext.execute(() -> {
1✔
135
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
136
        for (TrackedWatcher<?> watcher : watchers.watchers.values()) {
1✔
137
          watcher.close();
1✔
138
        }
1✔
139
      }
1✔
140
      resourceWatchers.clear();
1✔
141
      subscriptions.clear();
1✔
142
    });
1✔
143
  }
1✔
144

145
  private void releaseSubscription(ClusterSubscription subscription) {
146
    checkNotNull(subscription, "subscription");
1✔
147
    syncContext.execute(() -> {
1✔
148
      if (subscription.closed) {
1✔
149
        return;
1✔
150
      }
151
      subscription.closed = true;
1✔
152
      if (!subscriptions.remove(subscription)) {
1✔
153
        return; // shutdown() called
×
154
      }
155
      maybePublishConfig();
1✔
156
    });
1✔
157
  }
1✔
158

159
  /**
160
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
161
   * the watchers.
162
   */
163
  private void maybePublishConfig() {
164
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
165
    if (getWatchers(LDS_TYPE).isEmpty()) {
1✔
166
      return; // shutdown() called
×
167
    }
168
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
169
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
170
        .anyMatch(TrackedWatcher::missingResult);
1✔
171
    if (waitingOnResource) {
1✔
172
      return;
1✔
173
    }
174

175
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
176
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
177
      return;
1✔
178
    }
179
    assert newUpdate.hasValue()
1✔
180
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
181
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
182
    lastUpdate = newUpdate;
1✔
183
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
184
  }
1✔
185

186
  @VisibleForTesting
187
  StatusOr<XdsConfig> buildUpdate() {
188
    // Create a config and discard any watchers not accessed
189
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
190
    StatusOr<XdsConfig> config = buildUpdate(
1✔
191
        tracer, listenerName, dataPlaneAuthority, subscriptions);
192
    tracer.closeUnusedWatchers();
1✔
193
    return config;
1✔
194
  }
195

196
  private static StatusOr<XdsConfig> buildUpdate(
197
      WatcherTracer tracer,
198
      String listenerName,
199
      String dataPlaneAuthority,
200
      Set<ClusterSubscription> subscriptions) {
201
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
202

203
    // Iterate watchers and build the XdsConfig
204

205
    TrackedWatcher<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
206
        = tracer.getWatcher(LDS_TYPE, listenerName);
1✔
207
    if (ldsWatcher == null) {
1✔
208
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
209
          "Bug: No listener watcher found for " + listenerName));
210
    }
211
    if (!ldsWatcher.getData().hasValue()) {
1✔
212
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
213
    }
214
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
215
    builder.setListener(ldsUpdate);
1✔
216

217
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
218
    if (routeSource == null) {
1✔
219
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
220
          "Bug: No route source found for listener " + dataPlaneAuthority));
221
    }
222
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
223
    if (!statusOrRdsUpdate.hasValue()) {
1✔
224
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
225
    }
226
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
227
    builder.setRoute(rdsUpdate);
1✔
228

229
    VirtualHost activeVirtualHost =
1✔
230
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
231
    if (activeVirtualHost == null) {
1✔
232
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
233
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
234
    }
235
    builder.setVirtualHost(activeVirtualHost);
1✔
236

237
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
238
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
239
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
240
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
241
    }
1✔
242
    for (ClusterSubscription subscription : subscriptions) {
1✔
243
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
244
    }
1✔
245
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
246
      builder.addCluster(me.getKey(), me.getValue());
1✔
247
    }
1✔
248

249
    return StatusOr.fromValue(builder.build());
1✔
250
  }
251

252
  private <T> Map<String, TrackedWatcher<T>> getWatchers(TrackedWatcherType<T> watcherType) {
253
    TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
1✔
254
    if (typeWatchers == null) {
1✔
255
      typeWatchers = new TypeWatchers<T>(watcherType);
1✔
256
      resourceWatchers.put(watcherType.typeEnum, typeWatchers);
1✔
257
    }
258
    assert typeWatchers.watcherType == watcherType;
1✔
259
    @SuppressWarnings("unchecked")
260
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
261
    return tTypeWatchers.watchers;
1✔
262
  }
263

264
  private static void addConfigForCluster(
265
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
266
      String clusterName,
267
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
268
      LinkedHashSet<String> ancestors,
269
      WatcherTracer tracer) {
270
    if (clusters.containsKey(clusterName)) {
1✔
271
      return;
1✔
272
    }
273
    if (ancestors.contains(clusterName)) {
1✔
274
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
275
          Status.INTERNAL.withDescription(
1✔
276
              "Aggregate cluster cycle detected: " + ancestors)));
277
      return;
1✔
278
    }
279
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
280
      clusters.put(clusterName, StatusOr.fromStatus(
×
281
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
282
      return;
×
283
    }
284

285
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CDS_TYPE, clusterName);
1✔
286
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
287
    if (!cdsWatcherDataOr.hasValue()) {
1✔
288
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
289
      return;
1✔
290
    }
291

292
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
293
    XdsConfig.XdsClusterConfig.ClusterChild child;
294
    switch (cdsUpdate.clusterType()) {
1✔
295
      case AGGREGATE:
296
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
297
        // preserves the priority across all aggregate clusters
298
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
299
        ancestors.add(clusterName);
1✔
300
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
301
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
302
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
303
          if (!config.hasValue()) {
1✔
304
            // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
305
            // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
306
            // watchers reports a transient ADS stream error, the policy should report that it is in
307
            // TRANSIENT_FAILURE if it has never passed a config to its child.
308
            //
309
            // But there's currently disagreement about whether that is actually what we want, and
310
            // that was not originally implemented in gRPC Java. So we're keeping Java's old
311
            // behavior for now and only failing the "leaves" (which is a bit arbitrary for a
312
            // cycle).
313
            leafNames.add(childCluster);
1✔
314
            continue;
1✔
315
          }
316
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
317
          if (children instanceof AggregateConfig) {
1✔
318
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
319
          } else {
320
            leafNames.add(childCluster);
1✔
321
          }
322
        }
1✔
323
        ancestors.remove(clusterName);
1✔
324

325
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
326
        break;
1✔
327
      case EDS:
328
        TrackedWatcher<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
329
            tracer.getWatcher(EDS_TYPE, cdsWatcher.getEdsServiceName());
1✔
330
        if (edsWatcher != null) {
1✔
331
          child = new EndpointConfig(edsWatcher.getData());
1✔
332
        } else {
333
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
334
              "EDS resource not found for cluster " + clusterName)));
335
        }
336
        break;
×
337
      case LOGICAL_DNS:
338
        // TODO get the resolved endpoint configuration
339
        child = new EndpointConfig(StatusOr.fromStatus(
1✔
340
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
341
        break;
1✔
342
      default:
343
        child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
344
              "Unknown type in cluster " + clusterName + " " + cdsUpdate.clusterType())));
×
345
    }
346
    if (clusters.containsKey(clusterName)) {
1✔
347
      // If a cycle is detected, we'll have detected it while recursing, so now there will be a key
348
      // present. We don't want to overwrite it with a non-error value.
349
      return;
1✔
350
    }
351
    clusters.put(clusterName, StatusOr.fromValue(
1✔
352
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
353
  }
1✔
354

355
  private void addRdsWatcher(String resourceName) {
356
    if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
1✔
357
      return;
×
358
    }
359

360
    addWatcher(RDS_TYPE, new RdsWatcher(resourceName));
1✔
361
  }
1✔
362

363
  private void addEdsWatcher(String edsServiceName) {
364
    if (getWatchers(EDS_TYPE).containsKey(edsServiceName)) {
1✔
365
      return;
1✔
366
    }
367

368
    addWatcher(EDS_TYPE, new EdsWatcher(edsServiceName));
1✔
369
  }
1✔
370

371
  private void addClusterWatcher(String clusterName) {
372
    if (getWatchers(CDS_TYPE).containsKey(clusterName)) {
1✔
373
      return;
1✔
374
    }
375

376
    addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
1✔
377
  }
1✔
378

379
  private void updateRoutes(List<VirtualHost> virtualHosts) {
380
    VirtualHost virtualHost =
1✔
381
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
382
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
383
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
384
  }
1✔
385

386
  private String nodeInfo() {
387
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
388
  }
389

390
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
391
    if (virtualHost == null) {
1✔
392
      return Collections.emptySet();
1✔
393
    }
394

395
    // Get all cluster names to which requests can be routed through the virtual host.
396
    Set<String> clusters = new HashSet<>();
1✔
397
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
398
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
399
      if (action == null) {
1✔
400
        continue;
1✔
401
      }
402
      if (action.cluster() != null) {
1✔
403
        clusters.add(action.cluster());
1✔
404
      } else if (action.weightedClusters() != null) {
1✔
405
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
406
          clusters.add(weighedCluster.name());
1✔
407
        }
1✔
408
      }
409
    }
1✔
410

411
    return clusters;
1✔
412
  }
413

414
  private static class TypeWatchers<T> {
415
    // Key is resource name
416
    final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
1✔
417
    final TrackedWatcherType<T> watcherType;
418

419
    TypeWatchers(TrackedWatcherType<T> watcherType) {
1✔
420
      this.watcherType = checkNotNull(watcherType, "watcherType");
1✔
421
    }
1✔
422
  }
423

424
  public interface XdsConfigWatcher {
425
    /**
426
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
427
     * INTERNAL.
428
     */
429
    void onUpdate(StatusOr<XdsConfig> config);
430
  }
431

432
  private final class ClusterSubscription implements XdsConfig.Subscription {
433
    private final String clusterName;
434
    boolean closed; // Accessed from syncContext
435

436
    public ClusterSubscription(String clusterName) {
1✔
437
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
438
    }
1✔
439

440
    String getClusterName() {
441
      return clusterName;
1✔
442
    }
443

444
    @Override
445
    public void close() {
446
      releaseSubscription(this);
1✔
447
    }
1✔
448
  }
449

450
  /** State for tracing garbage collector. */
451
  private static final class WatcherTracer {
1✔
452
    private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers;
453
    private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> usedWatchers;
454

455
    public WatcherTracer(Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers) {
1✔
456
      this.resourceWatchers = resourceWatchers;
1✔
457

458
      this.usedWatchers = new EnumMap<>(TrackedWatcherTypeEnum.class);
1✔
459
      for (Map.Entry<TrackedWatcherTypeEnum, TypeWatchers<?>> me : resourceWatchers.entrySet()) {
1✔
460
        usedWatchers.put(me.getKey(), newTypeWatchers(me.getValue().watcherType));
1✔
461
      }
1✔
462
    }
1✔
463

464
    private static <T> TypeWatchers<T> newTypeWatchers(TrackedWatcherType<T> type) {
465
      return new TypeWatchers<T>(type);
1✔
466
    }
467

468
    public <T> TrackedWatcher<T> getWatcher(TrackedWatcherType<T> watcherType, String name) {
469
      TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
1✔
470
      if (typeWatchers == null) {
1✔
471
        return null;
×
472
      }
473
      assert typeWatchers.watcherType == watcherType;
1✔
474
      @SuppressWarnings("unchecked")
475
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
476
      TrackedWatcher<T> watcher = tTypeWatchers.watchers.get(name);
1✔
477
      if (watcher == null) {
1✔
478
        return null;
×
479
      }
480
      @SuppressWarnings("unchecked")
481
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(watcherType.typeEnum);
1✔
482
      usedTypeWatchers.watchers.put(name, watcher);
1✔
483
      return watcher;
1✔
484
    }
485

486
    /** Shut down unused watchers. */
487
    public void closeUnusedWatchers() {
488
      boolean changed = false; // Help out the GC by preferring old objects
1✔
489
      for (TrackedWatcherTypeEnum key : resourceWatchers.keySet()) {
1✔
490
        TypeWatchers<?> orig = resourceWatchers.get(key);
1✔
491
        TypeWatchers<?> used = usedWatchers.get(key);
1✔
492
        for (String name : orig.watchers.keySet()) {
1✔
493
          if (used.watchers.containsKey(name)) {
1✔
494
            continue;
1✔
495
          }
496
          orig.watchers.get(name).close();
1✔
497
          changed = true;
1✔
498
        }
1✔
499
      }
1✔
500
      if (changed) {
1✔
501
        resourceWatchers.putAll(usedWatchers);
1✔
502
      }
503
    }
1✔
504
  }
505

506
  @SuppressWarnings("UnusedTypeParameter")
507
  private static final class TrackedWatcherType<T> {
508
    public final TrackedWatcherTypeEnum typeEnum;
509

510
    public TrackedWatcherType(TrackedWatcherTypeEnum typeEnum) {
1✔
511
      this.typeEnum = checkNotNull(typeEnum, "typeEnum");
1✔
512
    }
1✔
513
  }
514

515
  private interface TrackedWatcher<T> {
516
    @Nullable
517
    StatusOr<T> getData();
518

519
    default boolean missingResult() {
520
      return getData() == null;
1✔
521
    }
522

523
    default boolean hasDataValue() {
524
      StatusOr<T> data = getData();
1✔
525
      return data != null && data.hasValue();
1✔
526
    }
527

528
    void close();
529
  }
530

531
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
532
      implements ResourceWatcher<T>, TrackedWatcher<T> {
533
    private final XdsResourceType<T> type;
534
    private final String resourceName;
535
    boolean cancelled;
536

537
    @Nullable
538
    private StatusOr<T> data;
539

540

541
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
542
      this.type = checkNotNull(type, "type");
1✔
543
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
544
    }
1✔
545

546
    @Override
547
    public void onError(Status error) {
548
      checkNotNull(error, "error");
1✔
549
      if (cancelled) {
1✔
550
        return;
×
551
      }
552
      // Don't update configuration on error, if we've already received configuration
553
      if (!hasDataValue()) {
1✔
554
        this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
555
            String.format("Error retrieving %s: %s: %s",
1✔
556
              toContextString(), error.getCode(), error.getDescription())));
1✔
557
        maybePublishConfig();
1✔
558
      }
559
    }
1✔
560

561
    @Override
562
    public void onResourceDoesNotExist(String resourceName) {
563
      if (cancelled) {
1✔
564
        return;
×
565
      }
566

567
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
568
      this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
569
          toContextString() + " does not exist" + nodeInfo()));
1✔
570
      maybePublishConfig();
1✔
571
    }
1✔
572

573
    @Override
574
    public void onChanged(T update) {
575
      checkNotNull(update, "update");
1✔
576
      if (cancelled) {
1✔
577
        return;
1✔
578
      }
579

580
      this.data = StatusOr.fromValue(update);
1✔
581
      subscribeToChildren(update);
1✔
582
      maybePublishConfig();
1✔
583
    }
1✔
584

585
    protected abstract void subscribeToChildren(T update);
586

587
    @Override
588
    public void close() {
589
      cancelled = true;
1✔
590
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
591
    }
1✔
592

593
    @Override
594
    @Nullable
595
    public StatusOr<T> getData() {
596
      return data;
1✔
597
    }
598

599
    public String toContextString() {
600
      return toContextStr(type.typeName(), resourceName);
1✔
601
    }
602
  }
603

604
  private interface RdsUpdateSupplier {
605
    StatusOr<RdsUpdate> getRdsUpdate();
606
  }
607

608
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
609
      implements RdsUpdateSupplier {
610

611
    private LdsWatcher(String resourceName) {
1✔
612
      super(XdsListenerResource.getInstance(), resourceName);
1✔
613
    }
1✔
614

615
    @Override
616
    public void subscribeToChildren(XdsListenerResource.LdsUpdate update) {
617
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
618
      List<VirtualHost> virtualHosts;
619
      if (httpConnectionManager == null) {
1✔
620
        // TCP listener. Unsupported config
621
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
622
      } else {
623
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
624
      }
625
      if (virtualHosts != null) {
1✔
626
        updateRoutes(virtualHosts);
1✔
627
      }
628

629
      String rdsName = getRdsName(update);
1✔
630
      if (rdsName != null) {
1✔
631
        addRdsWatcher(rdsName);
1✔
632
      }
633
    }
1✔
634

635
    private String getRdsName(XdsListenerResource.LdsUpdate update) {
636
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
637
      if (httpConnectionManager == null) {
1✔
638
        // TCP listener. Unsupported config
639
        return null;
1✔
640
      }
641
      return httpConnectionManager.rdsName();
1✔
642
    }
643

644
    private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTracer tracer) {
645
      String rdsName = getRdsName(update);
1✔
646
      if (rdsName == null) {
1✔
647
        return null;
×
648
      }
649
      return (RdsWatcher) tracer.getWatcher(RDS_TYPE, rdsName);
1✔
650
    }
651

652
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
653
      if (!hasDataValue()) {
1✔
654
        return this;
×
655
      }
656
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
657
      if (hcm == null) {
1✔
658
        return this;
1✔
659
      }
660
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
661
      if (virtualHosts != null) {
1✔
662
        return this;
1✔
663
      }
664
      RdsWatcher rdsWatcher = getRdsWatcher(getData().getValue(), tracer);
1✔
665
      assert rdsWatcher != null;
1✔
666
      return rdsWatcher;
1✔
667
    }
668

669
    @Override
670
    public StatusOr<RdsUpdate> getRdsUpdate() {
671
      if (missingResult()) {
1✔
672
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
673
      }
674
      if (!getData().hasValue()) {
1✔
675
        return StatusOr.fromStatus(getData().getStatus());
×
676
      }
677
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
678
      if (hcm == null) {
1✔
679
        return StatusOr.fromStatus(
1✔
680
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
681
      }
682
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
683
      if (virtualHosts == null) {
1✔
684
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
685
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
686
        // bug
687
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
688
      }
689
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
690
    }
691
  }
692

693
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
694

695
    public RdsWatcher(String resourceName) {
1✔
696
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
697
    }
1✔
698

699
    @Override
700
    public void subscribeToChildren(RdsUpdate update) {
701
      updateRoutes(update.virtualHosts);
1✔
702
    }
1✔
703

704
    @Override
705
    public StatusOr<RdsUpdate> getRdsUpdate() {
706
      if (missingResult()) {
1✔
707
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
708
      }
709
      return getData();
1✔
710
    }
711
  }
712

713
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
714
    CdsWatcher(String resourceName) {
1✔
715
      super(XdsClusterResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
716
    }
1✔
717

718
    @Override
719
    public void subscribeToChildren(XdsClusterResource.CdsUpdate update) {
720
      switch (update.clusterType()) {
1✔
721
        case EDS:
722
          addEdsWatcher(getEdsServiceName());
1✔
723
          break;
1✔
724
        case LOGICAL_DNS:
725
          // no eds needed
726
          break;
1✔
727
        case AGGREGATE:
728
          update.prioritizedClusterNames()
1✔
729
              .forEach(name -> addClusterWatcher(name));
1✔
730
          break;
1✔
731
        default:
732
      }
733
    }
1✔
734

735
    public String getEdsServiceName() {
736
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
737
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
738
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
739
      if (edsServiceName == null) {
1✔
740
        edsServiceName = cdsUpdate.clusterName();
×
741
      }
742
      return edsServiceName;
1✔
743
    }
744
  }
745

746
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
747
    private EdsWatcher(String resourceName) {
1✔
748
      super(XdsEndpointResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
749
    }
1✔
750

751
    @Override
752
    public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
1✔
753
  }
754
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc