• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19854

06 Jun 2025 03:43PM UTC coverage: 88.603% (-0.03%) from 88.636%
#19854

push

github

ejona86
xds: Use tracing GC in XdsDepManager

Reference counting doesn't release cycles, so swap to a tracing garbage
collector. This greatly simplifies the code as well, as diffing is no
longer necessary. (If vanilla reference counting was used, diffing
wouldn't have been necessary either as you just increment all the new
objects and decrement the old ones. But that doesn't work when use a set
instead of an integer.)

34697 of 39160 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.88
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.collect.ImmutableList;
25
import io.grpc.NameResolver;
26
import io.grpc.Status;
27
import io.grpc.StatusOr;
28
import io.grpc.SynchronizationContext;
29
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
30
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
31
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
32
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
33
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
34
import io.grpc.xds.client.XdsClient;
35
import io.grpc.xds.client.XdsClient.ResourceWatcher;
36
import io.grpc.xds.client.XdsResourceType;
37
import java.io.Closeable;
38
import java.io.IOException;
39
import java.util.Collections;
40
import java.util.HashMap;
41
import java.util.HashSet;
42
import java.util.LinkedHashSet;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.Objects;
46
import java.util.Set;
47
import java.util.concurrent.ScheduledExecutorService;
48
import javax.annotation.Nullable;
49

50
/**
51
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
52
 * maintains the watchers for the xds resources and when an update is received, it either requests
53
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
54
 * applies to a single data plane authority.
55
 */
56
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
57
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
58
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
59
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
60
  private final String listenerName;
61
  private final XdsClient xdsClient;
62
  private final XdsConfigWatcher xdsConfigWatcher;
63
  private final SynchronizationContext syncContext;
64
  private final String dataPlaneAuthority;
65

66
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
67
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
68
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
69

70
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
71
                       SynchronizationContext syncContext, String dataPlaneAuthority,
72
                       String listenerName, NameResolver.Args nameResolverArgs,
73
                       ScheduledExecutorService scheduler) {
1✔
74
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
75
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
76
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
77
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
78
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
79
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
80
    checkNotNull(scheduler, "scheduler");
1✔
81

82
    // start the ball rolling
83
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
84
  }
1✔
85

86
  public static String toContextStr(String typeName, String resourceName) {
87
    return typeName + " resource " + resourceName;
1✔
88
  }
89

90
  @Override
91
  public Closeable subscribeToCluster(String clusterName) {
92
    checkNotNull(clusterName, "clusterName");
1✔
93
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
94

95
    syncContext.execute(() -> {
1✔
96
      if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
97
        subscription.closed = true;
1✔
98
        return; // shutdown() called
1✔
99
      }
100
      subscriptions.add(subscription);
1✔
101
      addClusterWatcher(clusterName);
1✔
102
    });
1✔
103

104
    return subscription;
1✔
105
  }
106

107
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
108
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
109
    XdsResourceType<T> type = watcher.type;
1✔
110
    String resourceName = watcher.resourceName;
1✔
111

112
    getWatchers(type).put(resourceName, watcher);
1✔
113
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
114
  }
1✔
115

116
  public void shutdown() {
117
    syncContext.execute(() -> {
1✔
118
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
119
        shutdownWatchersForType(watchers);
1✔
120
      }
1✔
121
      resourceWatchers.clear();
1✔
122
      subscriptions.clear();
1✔
123
    });
1✔
124
  }
1✔
125

126
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
127
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
128
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
129
          watcherEntry.getValue());
1✔
130
      watcherEntry.getValue().cancelled = true;
1✔
131
    }
1✔
132
  }
1✔
133

134
  private void releaseSubscription(ClusterSubscription subscription) {
135
    checkNotNull(subscription, "subscription");
1✔
136
    syncContext.execute(() -> {
1✔
137
      if (subscription.closed) {
1✔
138
        return;
1✔
139
      }
140
      subscription.closed = true;
1✔
141
      if (!subscriptions.remove(subscription)) {
1✔
142
        return; // shutdown() called
×
143
      }
144
      maybePublishConfig();
1✔
145
    });
1✔
146
  }
1✔
147

148
  /**
149
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
150
   * the watchers.
151
   */
152
  private void maybePublishConfig() {
153
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
154
    if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
155
      return; // shutdown() called
×
156
    }
157
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
158
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
159
        .anyMatch(XdsWatcherBase::missingResult);
1✔
160
    if (waitingOnResource) {
1✔
161
      return;
1✔
162
    }
163

164
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
165
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
166
      return;
1✔
167
    }
168
    assert newUpdate.hasValue()
1✔
169
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
170
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
171
    lastUpdate = newUpdate;
1✔
172
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
173
  }
1✔
174

175
  @VisibleForTesting
176
  StatusOr<XdsConfig> buildUpdate() {
177
    // Create a config and discard any watchers not accessed
178
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
179
    StatusOr<XdsConfig> config = buildUpdate(
1✔
180
        tracer, listenerName, dataPlaneAuthority, subscriptions);
181
    tracer.closeUnusedWatchers();
1✔
182
    return config;
1✔
183
  }
184

185
  private static StatusOr<XdsConfig> buildUpdate(
186
      WatcherTracer tracer,
187
      String listenerName,
188
      String dataPlaneAuthority,
189
      Set<ClusterSubscription> subscriptions) {
190
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
191

192
    // Iterate watchers and build the XdsConfig
193

194
    XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
195
        = tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
1✔
196
    if (ldsWatcher == null) {
1✔
197
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
198
          "Bug: No listener watcher found for " + listenerName));
199
    }
200
    if (!ldsWatcher.getData().hasValue()) {
1✔
201
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
202
    }
203
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
204
    builder.setListener(ldsUpdate);
1✔
205

206
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
207
    if (routeSource == null) {
1✔
208
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
209
          "Bug: No route source found for listener " + dataPlaneAuthority));
210
    }
211
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
212
    if (!statusOrRdsUpdate.hasValue()) {
1✔
213
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
214
    }
215
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
216
    builder.setRoute(rdsUpdate);
1✔
217

218
    VirtualHost activeVirtualHost =
1✔
219
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
220
    if (activeVirtualHost == null) {
1✔
221
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
222
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
223
    }
224
    builder.setVirtualHost(activeVirtualHost);
1✔
225

226
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
227
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
228
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
229
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
230
    }
1✔
231
    for (ClusterSubscription subscription : subscriptions) {
1✔
232
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
233
    }
1✔
234
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
235
      builder.addCluster(me.getKey(), me.getValue());
1✔
236
    }
1✔
237

238
    return StatusOr.fromValue(builder.build());
1✔
239
  }
240

241
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
242
      XdsResourceType<T> resourceType) {
243
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
244
    if (typeWatchers == null) {
1✔
245
      typeWatchers = new TypeWatchers<T>(resourceType);
1✔
246
      resourceWatchers.put(resourceType, typeWatchers);
1✔
247
    }
248
    assert typeWatchers.resourceType == resourceType;
1✔
249
    @SuppressWarnings("unchecked")
250
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
251
    return tTypeWatchers.watchers;
1✔
252
  }
253

254
  private static void addConfigForCluster(
255
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
256
      String clusterName,
257
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
258
      LinkedHashSet<String> ancestors,
259
      WatcherTracer tracer) {
260
    if (clusters.containsKey(clusterName)) {
1✔
261
      return;
1✔
262
    }
263
    if (ancestors.contains(clusterName)) {
1✔
264
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
265
          Status.INTERNAL.withDescription(
1✔
266
              "Aggregate cluster cycle detected: " + ancestors)));
267
      return;
1✔
268
    }
269
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
270
      clusters.put(clusterName, StatusOr.fromStatus(
×
271
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
272
      return;
×
273
    }
274

275
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
1✔
276
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
277
    if (!cdsWatcherDataOr.hasValue()) {
1✔
278
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
279
      return;
1✔
280
    }
281

282
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
283
    XdsConfig.XdsClusterConfig.ClusterChild child;
284
    switch (cdsUpdate.clusterType()) {
1✔
285
      case AGGREGATE:
286
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
287
        // preserves the priority across all aggregate clusters
288
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
289
        ancestors.add(clusterName);
1✔
290
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
291
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
292
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
293
          if (!config.hasValue()) {
1✔
294
            clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
1✔
295
                "Unable to get leaves for " + clusterName + ": "
296
                + config.getStatus().getDescription())));
1✔
297
            return;
1✔
298
          }
299
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
300
          if (children instanceof AggregateConfig) {
1✔
301
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
302
          } else {
303
            leafNames.add(childCluster);
1✔
304
          }
305
        }
1✔
306
        ancestors.remove(clusterName);
1✔
307

308
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
309
        break;
1✔
310
      case EDS:
311
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
312
            tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
1✔
313
        if (edsWatcher != null) {
1✔
314
          child = new EndpointConfig(edsWatcher.getData());
1✔
315
        } else {
316
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
317
              "EDS resource not found for cluster " + clusterName)));
318
        }
319
        break;
×
320
      case LOGICAL_DNS:
321
        // TODO get the resolved endpoint configuration
322
        child = new EndpointConfig(StatusOr.fromStatus(
1✔
323
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
324
        break;
1✔
325
      default:
326
        throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
327
    }
328
    clusters.put(clusterName, StatusOr.fromValue(
1✔
329
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
330
  }
1✔
331

332
  private void addRdsWatcher(String resourceName) {
333
    if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
1✔
334
      return;
×
335
    }
336

337
    addWatcher(new RdsWatcher(resourceName));
1✔
338
  }
1✔
339

340
  private void addEdsWatcher(String edsServiceName) {
341
    if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
1✔
342
      return;
1✔
343
    }
344

345
    addWatcher(new EdsWatcher(edsServiceName));
1✔
346
  }
1✔
347

348
  private void addClusterWatcher(String clusterName) {
349
    if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
1✔
350
      return;
1✔
351
    }
352

353
    addWatcher(new CdsWatcher(clusterName));
1✔
354
  }
1✔
355

356
  private void updateRoutes(List<VirtualHost> virtualHosts) {
357
    VirtualHost virtualHost =
1✔
358
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
359
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
360
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
361
  }
1✔
362

363
  private String nodeInfo() {
364
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
365
  }
366

367
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
368
    if (virtualHost == null) {
1✔
369
      return Collections.emptySet();
1✔
370
    }
371

372
    // Get all cluster names to which requests can be routed through the virtual host.
373
    Set<String> clusters = new HashSet<>();
1✔
374
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
375
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
376
      if (action == null) {
1✔
377
        continue;
1✔
378
      }
379
      if (action.cluster() != null) {
1✔
380
        clusters.add(action.cluster());
1✔
381
      } else if (action.weightedClusters() != null) {
1✔
382
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
383
          clusters.add(weighedCluster.name());
1✔
384
        }
1✔
385
      }
386
    }
1✔
387

388
    return clusters;
1✔
389
  }
390

391
  private static class TypeWatchers<T extends ResourceUpdate> {
392
    // Key is resource name
393
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
394
    final XdsResourceType<T> resourceType;
395

396
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
397
      this.resourceType = resourceType;
1✔
398
    }
1✔
399
  }
400

401
  public interface XdsConfigWatcher {
402
    /**
403
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
404
     * INTERNAL.
405
     */
406
    void onUpdate(StatusOr<XdsConfig> config);
407
  }
408

409
  private final class ClusterSubscription implements Closeable {
410
    private final String clusterName;
411
    boolean closed; // Accessed from syncContext
412

413
    public ClusterSubscription(String clusterName) {
1✔
414
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
415
    }
1✔
416

417
    String getClusterName() {
418
      return clusterName;
1✔
419
    }
420

421
    @Override
422
    public void close() throws IOException {
423
      releaseSubscription(this);
1✔
424
    }
1✔
425
  }
426

427
  /** State for tracing garbage collector. */
428
  private static final class WatcherTracer {
1✔
429
    private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
430
    private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
431

432
    public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
1✔
433
      this.resourceWatchers = resourceWatchers;
1✔
434

435
      this.usedWatchers = new HashMap<>();
1✔
436
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
437
        usedWatchers.put(type, newTypeWatchers(type));
1✔
438
      }
1✔
439
    }
1✔
440

441
    private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
442
        XdsResourceType<T> type) {
443
      return new TypeWatchers<T>(type);
1✔
444
    }
445

446
    public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
447
        XdsResourceType<T> resourceType, String name) {
448
      TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
449
      if (typeWatchers == null) {
1✔
450
        return null;
×
451
      }
452
      assert typeWatchers.resourceType == resourceType;
1✔
453
      @SuppressWarnings("unchecked")
454
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
455
      XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
1✔
456
      if (watcher == null) {
1✔
457
        return null;
×
458
      }
459
      @SuppressWarnings("unchecked")
460
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
1✔
461
      usedTypeWatchers.watchers.put(name, watcher);
1✔
462
      return watcher;
1✔
463
    }
464

465
    /** Shut down unused watchers. */
466
    public void closeUnusedWatchers() {
467
      boolean changed = false; // Help out the GC by preferring old objects
1✔
468
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
469
        TypeWatchers<?> orig = resourceWatchers.get(type);
1✔
470
        TypeWatchers<?> used = usedWatchers.get(type);
1✔
471
        for (String name : orig.watchers.keySet()) {
1✔
472
          if (used.watchers.containsKey(name)) {
1✔
473
            continue;
1✔
474
          }
475
          orig.watchers.get(name).close();
1✔
476
          changed = true;
1✔
477
        }
1✔
478
      }
1✔
479
      if (changed) {
1✔
480
        resourceWatchers.putAll(usedWatchers);
1✔
481
      }
482
    }
1✔
483
  }
484

485
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
486
      implements ResourceWatcher<T> {
487
    private final XdsResourceType<T> type;
488
    private final String resourceName;
489
    boolean cancelled;
490

491
    @Nullable
492
    private StatusOr<T> data;
493

494

495
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
496
      this.type = checkNotNull(type, "type");
1✔
497
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
498
    }
1✔
499

500
    @Override
501
    public void onError(Status error) {
502
      checkNotNull(error, "error");
1✔
503
      if (cancelled) {
1✔
504
        return;
×
505
      }
506
      // Don't update configuration on error, if we've already received configuration
507
      if (!hasDataValue()) {
1✔
508
        setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
509
            String.format("Error retrieving %s: %s: %s",
1✔
510
              toContextString(), error.getCode(), error.getDescription())));
1✔
511
        maybePublishConfig();
1✔
512
      }
513
    }
1✔
514

515
    @Override
516
    public void onResourceDoesNotExist(String resourceName) {
517
      if (cancelled) {
1✔
518
        return;
×
519
      }
520

521
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
522
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
523
          toContextString() + " does not exist" + nodeInfo()));
1✔
524
      maybePublishConfig();
1✔
525
    }
1✔
526

527
    public void close() {
528
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
529
    }
1✔
530

531
    boolean missingResult() {
532
      return data == null;
1✔
533
    }
534

535
    @Nullable
536
    StatusOr<T> getData() {
537
      return data;
1✔
538
    }
539

540
    boolean hasDataValue() {
541
      return data != null && data.hasValue();
1✔
542
    }
543

544
    String resourceName() {
545
      return resourceName;
1✔
546
    }
547

548
    protected void setData(T data) {
549
      checkNotNull(data, "data");
1✔
550
      this.data = StatusOr.fromValue(data);
1✔
551
    }
1✔
552

553
    protected void setDataAsStatus(Status status) {
554
      checkNotNull(status, "status");
1✔
555
      this.data = StatusOr.fromStatus(status);
1✔
556
    }
1✔
557

558
    public String toContextString() {
559
      return toContextStr(type.typeName(), resourceName);
1✔
560
    }
561
  }
562

563
  private interface RdsUpdateSupplier {
564
    StatusOr<RdsUpdate> getRdsUpdate();
565
  }
566

567
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
568
      implements RdsUpdateSupplier {
569
    String rdsName;
570

571
    private LdsWatcher(String resourceName) {
1✔
572
      super(XdsListenerResource.getInstance(), resourceName);
1✔
573
    }
1✔
574

575
    @Override
576
    public void onChanged(XdsListenerResource.LdsUpdate update) {
577
      checkNotNull(update, "update");
1✔
578
      if (cancelled) {
1✔
579
        return;
1✔
580
      }
581

582
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
583
      List<VirtualHost> virtualHosts;
584
      String rdsName;
585
      if (httpConnectionManager == null) {
1✔
586
        // TCP listener. Unsupported config
587
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
588
        rdsName = null;
1✔
589
      } else {
590
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
591
        rdsName = httpConnectionManager.rdsName();
1✔
592
      }
593

594
      if (virtualHosts != null) {
1✔
595
        // No RDS watcher since we are getting RDS updates via LDS
596
        updateRoutes(virtualHosts);
1✔
597
        this.rdsName = null;
1✔
598
      } else {
599
        this.rdsName = rdsName;
1✔
600
        addRdsWatcher(rdsName);
1✔
601
      }
602

603
      setData(update);
1✔
604
      maybePublishConfig();
1✔
605
    }
1✔
606

607
    @Override
608
    public void onResourceDoesNotExist(String resourceName) {
609
      if (cancelled) {
1✔
610
        return;
×
611
      }
612

613
      checkArgument(resourceName().equals(resourceName), "Resource name does not match");
1✔
614
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
615
          toContextString() + " does not exist" + nodeInfo()));
1✔
616
      rdsName = null;
1✔
617
      maybePublishConfig();
1✔
618
    }
1✔
619

620
    private RdsWatcher getRdsWatcher(WatcherTracer tracer) {
621
      if (rdsName == null) {
1✔
622
        return null;
×
623
      }
624
      return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
1✔
625
    }
626

627
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
628
      if (!hasDataValue()) {
1✔
629
        return this;
×
630
      }
631
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
632
      if (hcm == null) {
1✔
633
        return this;
1✔
634
      }
635
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
636
      if (virtualHosts != null) {
1✔
637
        return this;
1✔
638
      }
639
      RdsWatcher rdsWatcher = getRdsWatcher(tracer);
1✔
640
      assert rdsWatcher != null;
1✔
641
      return rdsWatcher;
1✔
642
    }
643

644
    @Override
645
    public StatusOr<RdsUpdate> getRdsUpdate() {
646
      if (missingResult()) {
1✔
647
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
648
      }
649
      if (!getData().hasValue()) {
1✔
650
        return StatusOr.fromStatus(getData().getStatus());
×
651
      }
652
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
653
      if (hcm == null) {
1✔
654
        return StatusOr.fromStatus(
1✔
655
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
656
      }
657
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
658
      if (virtualHosts == null) {
1✔
659
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
660
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
661
        // bug
662
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
663
      }
664
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
665
    }
666
  }
667

668
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
669

670
    public RdsWatcher(String resourceName) {
1✔
671
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
672
    }
1✔
673

674
    @Override
675
    public void onChanged(RdsUpdate update) {
676
      checkNotNull(update, "update");
1✔
677
      if (cancelled) {
1✔
678
        return;
1✔
679
      }
680
      setData(update);
1✔
681
      updateRoutes(update.virtualHosts);
1✔
682
      maybePublishConfig();
1✔
683
    }
1✔
684

685
    @Override
686
    public StatusOr<RdsUpdate> getRdsUpdate() {
687
      if (missingResult()) {
1✔
688
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
689
      }
690
      return getData();
1✔
691
    }
692
  }
693

694
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
695
    CdsWatcher(String resourceName) {
1✔
696
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
697
    }
1✔
698

699
    @Override
700
    public void onChanged(XdsClusterResource.CdsUpdate update) {
701
      checkNotNull(update, "update");
1✔
702
      if (cancelled) {
1✔
703
        return;
1✔
704
      }
705
      switch (update.clusterType()) {
1✔
706
        case EDS:
707
          setData(update);
1✔
708
          addEdsWatcher(getEdsServiceName());
1✔
709
          break;
1✔
710
        case LOGICAL_DNS:
711
          setData(update);
1✔
712
          // no eds needed
713
          break;
1✔
714
        case AGGREGATE:
715
          setData(update);
1✔
716
          update.prioritizedClusterNames()
1✔
717
              .forEach(name -> addClusterWatcher(name));
1✔
718
          break;
1✔
719
        default:
720
          Status error = Status.UNAVAILABLE.withDescription(
×
721
              "unknown cluster type in " + resourceName() + " " + update.clusterType());
×
722
          setDataAsStatus(error);
×
723
      }
724
      maybePublishConfig();
1✔
725
    }
1✔
726

727
    public String getEdsServiceName() {
728
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
729
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
730
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
731
      if (edsServiceName == null) {
1✔
732
        edsServiceName = cdsUpdate.clusterName();
×
733
      }
734
      return edsServiceName;
1✔
735
    }
736
  }
737

738
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
739
    private EdsWatcher(String resourceName) {
1✔
740
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
741
    }
1✔
742

743
    @Override
744
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
745
      if (cancelled) {
1✔
746
        return;
1✔
747
      }
748
      setData(checkNotNull(update, "update"));
1✔
749
      maybePublishConfig();
1✔
750
    }
1✔
751
  }
752
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc