• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19908

16 Jul 2025 07:54PM UTC coverage: 88.593% (+0.07%) from 88.528%
#19908

push

github

ejona86
Revert "xds: Convert CdsLb to XdsDepManager"

This reverts commit 297ab05ef.

b/430347751 shows multiple concerning behaviors in the xDS stack with
the new A74 config update model. XdsDepManager and CdsLB2 still seem to
be working correctly, but the change is exacerbated issues in other
parts of the stack, like RingHashConfig not having equals fixed in
a8de9f07ab.

Revert only for the v1.74.x release, leaving it on master.

34647 of 39108 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.18
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.collect.ImmutableList;
25
import io.grpc.NameResolver;
26
import io.grpc.Status;
27
import io.grpc.StatusOr;
28
import io.grpc.SynchronizationContext;
29
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
30
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
31
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
32
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
33
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
34
import io.grpc.xds.client.XdsClient;
35
import io.grpc.xds.client.XdsClient.ResourceWatcher;
36
import io.grpc.xds.client.XdsResourceType;
37
import java.io.Closeable;
38
import java.io.IOException;
39
import java.util.Collections;
40
import java.util.HashMap;
41
import java.util.HashSet;
42
import java.util.LinkedHashSet;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.Objects;
46
import java.util.Set;
47
import java.util.concurrent.ScheduledExecutorService;
48
import javax.annotation.Nullable;
49

50
/**
51
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
52
 * maintains the watchers for the xds resources and when an update is received, it either requests
53
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
54
 * applies to a single data plane authority.
55
 */
56
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
57
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
58
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
59
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
60
  private final String listenerName;
61
  private final XdsClient xdsClient;
62
  private final XdsConfigWatcher xdsConfigWatcher;
63
  private final SynchronizationContext syncContext;
64
  private final String dataPlaneAuthority;
65

66
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
67
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
68
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
69

70
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
71
                       SynchronizationContext syncContext, String dataPlaneAuthority,
72
                       String listenerName, NameResolver.Args nameResolverArgs,
73
                       ScheduledExecutorService scheduler) {
1✔
74
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
75
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
76
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
77
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
78
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
79
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
80
    checkNotNull(scheduler, "scheduler");
1✔
81

82
    // start the ball rolling
83
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
84
  }
1✔
85

86
  public static String toContextStr(String typeName, String resourceName) {
87
    return typeName + " resource " + resourceName;
1✔
88
  }
89

90
  @Override
91
  public Closeable subscribeToCluster(String clusterName) {
92
    checkNotNull(clusterName, "clusterName");
1✔
93
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
94

95
    syncContext.execute(() -> {
1✔
96
      if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
97
        subscription.closed = true;
1✔
98
        return; // shutdown() called
1✔
99
      }
100
      subscriptions.add(subscription);
1✔
101
      addClusterWatcher(clusterName);
1✔
102
    });
1✔
103

104
    return subscription;
1✔
105
  }
106

107
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
108
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
109
    XdsResourceType<T> type = watcher.type;
1✔
110
    String resourceName = watcher.resourceName;
1✔
111

112
    getWatchers(type).put(resourceName, watcher);
1✔
113
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
114
  }
1✔
115

116
  public void shutdown() {
117
    syncContext.execute(() -> {
1✔
118
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
119
        shutdownWatchersForType(watchers);
1✔
120
      }
1✔
121
      resourceWatchers.clear();
1✔
122
      subscriptions.clear();
1✔
123
    });
1✔
124
  }
1✔
125

126
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
127
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
128
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
129
          watcherEntry.getValue());
1✔
130
      watcherEntry.getValue().cancelled = true;
1✔
131
    }
1✔
132
  }
1✔
133

134
  private void releaseSubscription(ClusterSubscription subscription) {
135
    checkNotNull(subscription, "subscription");
1✔
136
    syncContext.execute(() -> {
1✔
137
      if (subscription.closed) {
1✔
138
        return;
1✔
139
      }
140
      subscription.closed = true;
1✔
141
      if (!subscriptions.remove(subscription)) {
1✔
142
        return; // shutdown() called
×
143
      }
144
      maybePublishConfig();
1✔
145
    });
1✔
146
  }
1✔
147

148
  /**
149
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
150
   * the watchers.
151
   */
152
  private void maybePublishConfig() {
153
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
154
    if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
155
      return; // shutdown() called
×
156
    }
157
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
158
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
159
        .anyMatch(XdsWatcherBase::missingResult);
1✔
160
    if (waitingOnResource) {
1✔
161
      return;
1✔
162
    }
163

164
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
165
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
166
      return;
1✔
167
    }
168
    assert newUpdate.hasValue()
1✔
169
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
170
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
171
    lastUpdate = newUpdate;
1✔
172
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
173
  }
1✔
174

175
  @VisibleForTesting
176
  StatusOr<XdsConfig> buildUpdate() {
177
    // Create a config and discard any watchers not accessed
178
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
179
    StatusOr<XdsConfig> config = buildUpdate(
1✔
180
        tracer, listenerName, dataPlaneAuthority, subscriptions);
181
    tracer.closeUnusedWatchers();
1✔
182
    return config;
1✔
183
  }
184

185
  private static StatusOr<XdsConfig> buildUpdate(
186
      WatcherTracer tracer,
187
      String listenerName,
188
      String dataPlaneAuthority,
189
      Set<ClusterSubscription> subscriptions) {
190
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
191

192
    // Iterate watchers and build the XdsConfig
193

194
    XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
195
        = tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
1✔
196
    if (ldsWatcher == null) {
1✔
197
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
198
          "Bug: No listener watcher found for " + listenerName));
199
    }
200
    if (!ldsWatcher.getData().hasValue()) {
1✔
201
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
202
    }
203
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
204
    builder.setListener(ldsUpdate);
1✔
205

206
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
207
    if (routeSource == null) {
1✔
208
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
209
          "Bug: No route source found for listener " + dataPlaneAuthority));
210
    }
211
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
212
    if (!statusOrRdsUpdate.hasValue()) {
1✔
213
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
214
    }
215
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
216
    builder.setRoute(rdsUpdate);
1✔
217

218
    VirtualHost activeVirtualHost =
1✔
219
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
220
    if (activeVirtualHost == null) {
1✔
221
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
222
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
223
    }
224
    builder.setVirtualHost(activeVirtualHost);
1✔
225

226
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
227
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
228
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
229
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
230
    }
1✔
231
    for (ClusterSubscription subscription : subscriptions) {
1✔
232
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
233
    }
1✔
234
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
235
      builder.addCluster(me.getKey(), me.getValue());
1✔
236
    }
1✔
237

238
    return StatusOr.fromValue(builder.build());
1✔
239
  }
240

241
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
242
      XdsResourceType<T> resourceType) {
243
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
244
    if (typeWatchers == null) {
1✔
245
      typeWatchers = new TypeWatchers<T>(resourceType);
1✔
246
      resourceWatchers.put(resourceType, typeWatchers);
1✔
247
    }
248
    assert typeWatchers.resourceType == resourceType;
1✔
249
    @SuppressWarnings("unchecked")
250
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
251
    return tTypeWatchers.watchers;
1✔
252
  }
253

254
  private static void addConfigForCluster(
255
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
256
      String clusterName,
257
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
258
      LinkedHashSet<String> ancestors,
259
      WatcherTracer tracer) {
260
    if (clusters.containsKey(clusterName)) {
1✔
261
      return;
1✔
262
    }
263
    if (ancestors.contains(clusterName)) {
1✔
264
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
265
          Status.INTERNAL.withDescription(
1✔
266
              "Aggregate cluster cycle detected: " + ancestors)));
267
      return;
1✔
268
    }
269
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
270
      clusters.put(clusterName, StatusOr.fromStatus(
×
271
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
272
      return;
×
273
    }
274

275
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
1✔
276
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
277
    if (!cdsWatcherDataOr.hasValue()) {
1✔
278
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
279
      return;
1✔
280
    }
281

282
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
283
    XdsConfig.XdsClusterConfig.ClusterChild child;
284
    switch (cdsUpdate.clusterType()) {
1✔
285
      case AGGREGATE:
286
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
287
        // preserves the priority across all aggregate clusters
288
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
289
        ancestors.add(clusterName);
1✔
290
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
291
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
292
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
293
          if (!config.hasValue()) {
1✔
294
            clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
1✔
295
                "Unable to get leaves for " + clusterName + ": "
296
                + config.getStatus().getDescription())));
1✔
297
            return;
1✔
298
          }
299
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
300
          if (children instanceof AggregateConfig) {
1✔
301
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
302
          } else {
303
            leafNames.add(childCluster);
1✔
304
          }
305
        }
1✔
306
        ancestors.remove(clusterName);
1✔
307

308
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
309
        break;
1✔
310
      case EDS:
311
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
312
            tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
1✔
313
        if (edsWatcher != null) {
1✔
314
          child = new EndpointConfig(edsWatcher.getData());
1✔
315
        } else {
316
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
317
              "EDS resource not found for cluster " + clusterName)));
318
        }
319
        break;
×
320
      case LOGICAL_DNS:
321
        // TODO get the resolved endpoint configuration
322
        child = new EndpointConfig(StatusOr.fromStatus(
1✔
323
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
324
        break;
1✔
325
      default:
326
        child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
327
              "Unknown type in cluster " + clusterName + " " + cdsUpdate.clusterType())));
×
328
    }
329
    clusters.put(clusterName, StatusOr.fromValue(
1✔
330
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
331
  }
1✔
332

333
  private void addRdsWatcher(String resourceName) {
334
    if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
1✔
335
      return;
×
336
    }
337

338
    addWatcher(new RdsWatcher(resourceName));
1✔
339
  }
1✔
340

341
  private void addEdsWatcher(String edsServiceName) {
342
    if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
1✔
343
      return;
1✔
344
    }
345

346
    addWatcher(new EdsWatcher(edsServiceName));
1✔
347
  }
1✔
348

349
  private void addClusterWatcher(String clusterName) {
350
    if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
1✔
351
      return;
1✔
352
    }
353

354
    addWatcher(new CdsWatcher(clusterName));
1✔
355
  }
1✔
356

357
  private void updateRoutes(List<VirtualHost> virtualHosts) {
358
    VirtualHost virtualHost =
1✔
359
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
360
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
361
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
362
  }
1✔
363

364
  private String nodeInfo() {
365
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
366
  }
367

368
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
369
    if (virtualHost == null) {
1✔
370
      return Collections.emptySet();
1✔
371
    }
372

373
    // Get all cluster names to which requests can be routed through the virtual host.
374
    Set<String> clusters = new HashSet<>();
1✔
375
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
376
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
377
      if (action == null) {
1✔
378
        continue;
1✔
379
      }
380
      if (action.cluster() != null) {
1✔
381
        clusters.add(action.cluster());
1✔
382
      } else if (action.weightedClusters() != null) {
1✔
383
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
384
          clusters.add(weighedCluster.name());
1✔
385
        }
1✔
386
      }
387
    }
1✔
388

389
    return clusters;
1✔
390
  }
391

392
  private static class TypeWatchers<T extends ResourceUpdate> {
393
    // Key is resource name
394
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
395
    final XdsResourceType<T> resourceType;
396

397
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
398
      this.resourceType = resourceType;
1✔
399
    }
1✔
400
  }
401

402
  public interface XdsConfigWatcher {
403
    /**
404
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
405
     * INTERNAL.
406
     */
407
    void onUpdate(StatusOr<XdsConfig> config);
408
  }
409

410
  private final class ClusterSubscription implements Closeable {
411
    private final String clusterName;
412
    boolean closed; // Accessed from syncContext
413

414
    public ClusterSubscription(String clusterName) {
1✔
415
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
416
    }
1✔
417

418
    String getClusterName() {
419
      return clusterName;
1✔
420
    }
421

422
    @Override
423
    public void close() throws IOException {
424
      releaseSubscription(this);
1✔
425
    }
1✔
426
  }
427

428
  /** State for tracing garbage collector. */
429
  private static final class WatcherTracer {
1✔
430
    private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
431
    private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
432

433
    public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
1✔
434
      this.resourceWatchers = resourceWatchers;
1✔
435

436
      this.usedWatchers = new HashMap<>();
1✔
437
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
438
        usedWatchers.put(type, newTypeWatchers(type));
1✔
439
      }
1✔
440
    }
1✔
441

442
    private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
443
        XdsResourceType<T> type) {
444
      return new TypeWatchers<T>(type);
1✔
445
    }
446

447
    public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
448
        XdsResourceType<T> resourceType, String name) {
449
      TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
450
      if (typeWatchers == null) {
1✔
451
        return null;
×
452
      }
453
      assert typeWatchers.resourceType == resourceType;
1✔
454
      @SuppressWarnings("unchecked")
455
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
456
      XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
1✔
457
      if (watcher == null) {
1✔
458
        return null;
×
459
      }
460
      @SuppressWarnings("unchecked")
461
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
1✔
462
      usedTypeWatchers.watchers.put(name, watcher);
1✔
463
      return watcher;
1✔
464
    }
465

466
    /** Shut down unused watchers. */
467
    public void closeUnusedWatchers() {
468
      boolean changed = false; // Help out the GC by preferring old objects
1✔
469
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
470
        TypeWatchers<?> orig = resourceWatchers.get(type);
1✔
471
        TypeWatchers<?> used = usedWatchers.get(type);
1✔
472
        for (String name : orig.watchers.keySet()) {
1✔
473
          if (used.watchers.containsKey(name)) {
1✔
474
            continue;
1✔
475
          }
476
          orig.watchers.get(name).close();
1✔
477
          changed = true;
1✔
478
        }
1✔
479
      }
1✔
480
      if (changed) {
1✔
481
        resourceWatchers.putAll(usedWatchers);
1✔
482
      }
483
    }
1✔
484
  }
485

486
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
487
      implements ResourceWatcher<T> {
488
    private final XdsResourceType<T> type;
489
    private final String resourceName;
490
    boolean cancelled;
491

492
    @Nullable
493
    private StatusOr<T> data;
494

495

496
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
497
      this.type = checkNotNull(type, "type");
1✔
498
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
499
    }
1✔
500

501
    @Override
502
    public void onError(Status error) {
503
      checkNotNull(error, "error");
1✔
504
      if (cancelled) {
1✔
505
        return;
×
506
      }
507
      // Don't update configuration on error, if we've already received configuration
508
      if (!hasDataValue()) {
1✔
509
        this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
510
            String.format("Error retrieving %s: %s: %s",
1✔
511
              toContextString(), error.getCode(), error.getDescription())));
1✔
512
        maybePublishConfig();
1✔
513
      }
514
    }
1✔
515

516
    @Override
517
    public void onResourceDoesNotExist(String resourceName) {
518
      if (cancelled) {
1✔
519
        return;
×
520
      }
521

522
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
523
      this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
1✔
524
          toContextString() + " does not exist" + nodeInfo()));
1✔
525
      maybePublishConfig();
1✔
526
    }
1✔
527

528
    @Override
529
    public void onChanged(T update) {
530
      checkNotNull(update, "update");
1✔
531
      if (cancelled) {
1✔
532
        return;
1✔
533
      }
534

535
      this.data = StatusOr.fromValue(update);
1✔
536
      subscribeToChildren(update);
1✔
537
      maybePublishConfig();
1✔
538
    }
1✔
539

540
    protected abstract void subscribeToChildren(T update);
541

542
    public void close() {
543
      cancelled = true;
1✔
544
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
545
    }
1✔
546

547
    boolean missingResult() {
548
      return data == null;
1✔
549
    }
550

551
    @Nullable
552
    StatusOr<T> getData() {
553
      return data;
1✔
554
    }
555

556
    boolean hasDataValue() {
557
      return data != null && data.hasValue();
1✔
558
    }
559

560
    public String toContextString() {
561
      return toContextStr(type.typeName(), resourceName);
1✔
562
    }
563
  }
564

565
  private interface RdsUpdateSupplier {
566
    StatusOr<RdsUpdate> getRdsUpdate();
567
  }
568

569
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
570
      implements RdsUpdateSupplier {
571

572
    private LdsWatcher(String resourceName) {
1✔
573
      super(XdsListenerResource.getInstance(), resourceName);
1✔
574
    }
1✔
575

576
    @Override
577
    public void subscribeToChildren(XdsListenerResource.LdsUpdate update) {
578
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
579
      List<VirtualHost> virtualHosts;
580
      if (httpConnectionManager == null) {
1✔
581
        // TCP listener. Unsupported config
582
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
583
      } else {
584
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
585
      }
586
      if (virtualHosts != null) {
1✔
587
        updateRoutes(virtualHosts);
1✔
588
      }
589

590
      String rdsName = getRdsName(update);
1✔
591
      if (rdsName != null) {
1✔
592
        addRdsWatcher(rdsName);
1✔
593
      }
594
    }
1✔
595

596
    private String getRdsName(XdsListenerResource.LdsUpdate update) {
597
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
598
      if (httpConnectionManager == null) {
1✔
599
        // TCP listener. Unsupported config
600
        return null;
1✔
601
      }
602
      return httpConnectionManager.rdsName();
1✔
603
    }
604

605
    private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTracer tracer) {
606
      String rdsName = getRdsName(update);
1✔
607
      if (rdsName == null) {
1✔
608
        return null;
×
609
      }
610
      return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
1✔
611
    }
612

613
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
614
      if (!hasDataValue()) {
1✔
615
        return this;
×
616
      }
617
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
618
      if (hcm == null) {
1✔
619
        return this;
1✔
620
      }
621
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
622
      if (virtualHosts != null) {
1✔
623
        return this;
1✔
624
      }
625
      RdsWatcher rdsWatcher = getRdsWatcher(getData().getValue(), tracer);
1✔
626
      assert rdsWatcher != null;
1✔
627
      return rdsWatcher;
1✔
628
    }
629

630
    @Override
631
    public StatusOr<RdsUpdate> getRdsUpdate() {
632
      if (missingResult()) {
1✔
633
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
634
      }
635
      if (!getData().hasValue()) {
1✔
636
        return StatusOr.fromStatus(getData().getStatus());
×
637
      }
638
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
639
      if (hcm == null) {
1✔
640
        return StatusOr.fromStatus(
1✔
641
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
642
      }
643
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
644
      if (virtualHosts == null) {
1✔
645
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
646
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
647
        // bug
648
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
649
      }
650
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
651
    }
652
  }
653

654
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
655

656
    public RdsWatcher(String resourceName) {
1✔
657
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
658
    }
1✔
659

660
    @Override
661
    public void subscribeToChildren(RdsUpdate update) {
662
      updateRoutes(update.virtualHosts);
1✔
663
    }
1✔
664

665
    @Override
666
    public StatusOr<RdsUpdate> getRdsUpdate() {
667
      if (missingResult()) {
1✔
668
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
669
      }
670
      return getData();
1✔
671
    }
672
  }
673

674
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
675
    CdsWatcher(String resourceName) {
1✔
676
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
677
    }
1✔
678

679
    @Override
680
    public void subscribeToChildren(XdsClusterResource.CdsUpdate update) {
681
      switch (update.clusterType()) {
1✔
682
        case EDS:
683
          addEdsWatcher(getEdsServiceName());
1✔
684
          break;
1✔
685
        case LOGICAL_DNS:
686
          // no eds needed
687
          break;
1✔
688
        case AGGREGATE:
689
          update.prioritizedClusterNames()
1✔
690
              .forEach(name -> addClusterWatcher(name));
1✔
691
          break;
1✔
692
        default:
693
      }
694
    }
1✔
695

696
    public String getEdsServiceName() {
697
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
698
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
699
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
700
      if (edsServiceName == null) {
1✔
701
        edsServiceName = cdsUpdate.clusterName();
×
702
      }
703
      return edsServiceName;
1✔
704
    }
705
  }
706

707
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
708
    private EdsWatcher(String resourceName) {
1✔
709
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
710
    }
1✔
711

712
    @Override
713
    public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
1✔
714
  }
715
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc