• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19856

09 Jun 2025 02:13PM UTC coverage: 88.61% (+0.001%) from 88.609%
#19856

push

github

ejona86
xds: Don't cache rdsName in XdsDepManager

We can easily compute the rdsName and avoiding the state means we don't
need to override onResourceDoesNotExist() to keep the cache in-sync with
the config.

34696 of 39156 relevant lines covered (88.61%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.8
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.collect.ImmutableList;
25
import io.grpc.NameResolver;
26
import io.grpc.Status;
27
import io.grpc.StatusOr;
28
import io.grpc.SynchronizationContext;
29
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
30
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
31
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
32
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
33
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
34
import io.grpc.xds.client.XdsClient;
35
import io.grpc.xds.client.XdsClient.ResourceWatcher;
36
import io.grpc.xds.client.XdsResourceType;
37
import java.io.Closeable;
38
import java.io.IOException;
39
import java.util.Collections;
40
import java.util.HashMap;
41
import java.util.HashSet;
42
import java.util.LinkedHashSet;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.Objects;
46
import java.util.Set;
47
import java.util.concurrent.ScheduledExecutorService;
48
import javax.annotation.Nullable;
49

50
/**
51
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
52
 * maintains the watchers for the xds resources and when an update is received, it either requests
53
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
54
 * applies to a single data plane authority.
55
 */
56
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
57
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
58
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
59
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
60
  private final String listenerName;
61
  private final XdsClient xdsClient;
62
  private final XdsConfigWatcher xdsConfigWatcher;
63
  private final SynchronizationContext syncContext;
64
  private final String dataPlaneAuthority;
65

66
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
67
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
68
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
69

70
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
71
                       SynchronizationContext syncContext, String dataPlaneAuthority,
72
                       String listenerName, NameResolver.Args nameResolverArgs,
73
                       ScheduledExecutorService scheduler) {
1✔
74
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
75
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
76
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
77
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
78
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
79
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
80
    checkNotNull(scheduler, "scheduler");
1✔
81

82
    // start the ball rolling
83
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
84
  }
1✔
85

86
  public static String toContextStr(String typeName, String resourceName) {
87
    return typeName + " resource " + resourceName;
1✔
88
  }
89

90
  @Override
91
  public Closeable subscribeToCluster(String clusterName) {
92
    checkNotNull(clusterName, "clusterName");
1✔
93
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
94

95
    syncContext.execute(() -> {
1✔
96
      if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
97
        subscription.closed = true;
1✔
98
        return; // shutdown() called
1✔
99
      }
100
      subscriptions.add(subscription);
1✔
101
      addClusterWatcher(clusterName);
1✔
102
    });
1✔
103

104
    return subscription;
1✔
105
  }
106

107
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
108
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
109
    XdsResourceType<T> type = watcher.type;
1✔
110
    String resourceName = watcher.resourceName;
1✔
111

112
    getWatchers(type).put(resourceName, watcher);
1✔
113
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
114
  }
1✔
115

116
  public void shutdown() {
117
    syncContext.execute(() -> {
1✔
118
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
119
        shutdownWatchersForType(watchers);
1✔
120
      }
1✔
121
      resourceWatchers.clear();
1✔
122
      subscriptions.clear();
1✔
123
    });
1✔
124
  }
1✔
125

126
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
127
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
128
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
129
          watcherEntry.getValue());
1✔
130
      watcherEntry.getValue().cancelled = true;
1✔
131
    }
1✔
132
  }
1✔
133

134
  private void releaseSubscription(ClusterSubscription subscription) {
135
    checkNotNull(subscription, "subscription");
1✔
136
    syncContext.execute(() -> {
1✔
137
      if (subscription.closed) {
1✔
138
        return;
1✔
139
      }
140
      subscription.closed = true;
1✔
141
      if (!subscriptions.remove(subscription)) {
1✔
142
        return; // shutdown() called
×
143
      }
144
      maybePublishConfig();
1✔
145
    });
1✔
146
  }
1✔
147

148
  /**
149
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
150
   * the watchers.
151
   */
152
  private void maybePublishConfig() {
153
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
154
    if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
155
      return; // shutdown() called
×
156
    }
157
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
158
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
159
        .anyMatch(XdsWatcherBase::missingResult);
1✔
160
    if (waitingOnResource) {
1✔
161
      return;
1✔
162
    }
163

164
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
165
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
166
      return;
1✔
167
    }
168
    assert newUpdate.hasValue()
1✔
169
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
170
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
171
    lastUpdate = newUpdate;
1✔
172
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
173
  }
1✔
174

175
  @VisibleForTesting
176
  StatusOr<XdsConfig> buildUpdate() {
177
    // Create a config and discard any watchers not accessed
178
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
179
    StatusOr<XdsConfig> config = buildUpdate(
1✔
180
        tracer, listenerName, dataPlaneAuthority, subscriptions);
181
    tracer.closeUnusedWatchers();
1✔
182
    return config;
1✔
183
  }
184

185
  private static StatusOr<XdsConfig> buildUpdate(
186
      WatcherTracer tracer,
187
      String listenerName,
188
      String dataPlaneAuthority,
189
      Set<ClusterSubscription> subscriptions) {
190
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
191

192
    // Iterate watchers and build the XdsConfig
193

194
    XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
195
        = tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
1✔
196
    if (ldsWatcher == null) {
1✔
197
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
198
          "Bug: No listener watcher found for " + listenerName));
199
    }
200
    if (!ldsWatcher.getData().hasValue()) {
1✔
201
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
202
    }
203
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
204
    builder.setListener(ldsUpdate);
1✔
205

206
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
207
    if (routeSource == null) {
1✔
208
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
209
          "Bug: No route source found for listener " + dataPlaneAuthority));
210
    }
211
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
212
    if (!statusOrRdsUpdate.hasValue()) {
1✔
213
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
214
    }
215
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
216
    builder.setRoute(rdsUpdate);
1✔
217

218
    VirtualHost activeVirtualHost =
1✔
219
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
220
    if (activeVirtualHost == null) {
1✔
221
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
222
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
223
    }
224
    builder.setVirtualHost(activeVirtualHost);
1✔
225

226
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
227
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
228
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
229
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
230
    }
1✔
231
    for (ClusterSubscription subscription : subscriptions) {
1✔
232
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
233
    }
1✔
234
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
235
      builder.addCluster(me.getKey(), me.getValue());
1✔
236
    }
1✔
237

238
    return StatusOr.fromValue(builder.build());
1✔
239
  }
240

241
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
242
      XdsResourceType<T> resourceType) {
243
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
244
    if (typeWatchers == null) {
1✔
245
      typeWatchers = new TypeWatchers<T>(resourceType);
1✔
246
      resourceWatchers.put(resourceType, typeWatchers);
1✔
247
    }
248
    assert typeWatchers.resourceType == resourceType;
1✔
249
    @SuppressWarnings("unchecked")
250
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
251
    return tTypeWatchers.watchers;
1✔
252
  }
253

254
  private static void addConfigForCluster(
255
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
256
      String clusterName,
257
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
258
      LinkedHashSet<String> ancestors,
259
      WatcherTracer tracer) {
260
    if (clusters.containsKey(clusterName)) {
1✔
261
      return;
1✔
262
    }
263
    if (ancestors.contains(clusterName)) {
1✔
264
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
265
          Status.INTERNAL.withDescription(
1✔
266
              "Aggregate cluster cycle detected: " + ancestors)));
267
      return;
1✔
268
    }
269
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
270
      clusters.put(clusterName, StatusOr.fromStatus(
×
271
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
272
      return;
×
273
    }
274

275
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
1✔
276
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
277
    if (!cdsWatcherDataOr.hasValue()) {
1✔
278
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
279
      return;
1✔
280
    }
281

282
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
283
    XdsConfig.XdsClusterConfig.ClusterChild child;
284
    switch (cdsUpdate.clusterType()) {
1✔
285
      case AGGREGATE:
286
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
287
        // preserves the priority across all aggregate clusters
288
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
289
        ancestors.add(clusterName);
1✔
290
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
291
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
292
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
293
          if (!config.hasValue()) {
1✔
294
            clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
1✔
295
                "Unable to get leaves for " + clusterName + ": "
296
                + config.getStatus().getDescription())));
1✔
297
            return;
1✔
298
          }
299
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
300
          if (children instanceof AggregateConfig) {
1✔
301
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
302
          } else {
303
            leafNames.add(childCluster);
1✔
304
          }
305
        }
1✔
306
        ancestors.remove(clusterName);
1✔
307

308
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
309
        break;
1✔
310
      case EDS:
311
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
312
            tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
1✔
313
        if (edsWatcher != null) {
1✔
314
          child = new EndpointConfig(edsWatcher.getData());
1✔
315
        } else {
316
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
317
              "EDS resource not found for cluster " + clusterName)));
318
        }
319
        break;
×
320
      case LOGICAL_DNS:
321
        // TODO get the resolved endpoint configuration
322
        child = new EndpointConfig(StatusOr.fromStatus(
1✔
323
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
324
        break;
1✔
325
      default:
326
        throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
327
    }
328
    clusters.put(clusterName, StatusOr.fromValue(
1✔
329
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
330
  }
1✔
331

332
  private void addRdsWatcher(String resourceName) {
333
    if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
1✔
334
      return;
×
335
    }
336

337
    addWatcher(new RdsWatcher(resourceName));
1✔
338
  }
1✔
339

340
  private void addEdsWatcher(String edsServiceName) {
341
    if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
1✔
342
      return;
1✔
343
    }
344

345
    addWatcher(new EdsWatcher(edsServiceName));
1✔
346
  }
1✔
347

348
  private void addClusterWatcher(String clusterName) {
349
    if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
1✔
350
      return;
1✔
351
    }
352

353
    addWatcher(new CdsWatcher(clusterName));
1✔
354
  }
1✔
355

356
  private void updateRoutes(List<VirtualHost> virtualHosts) {
357
    VirtualHost virtualHost =
1✔
358
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
359
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
360
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
361
  }
1✔
362

363
  private String nodeInfo() {
364
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
365
  }
366

367
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
368
    if (virtualHost == null) {
1✔
369
      return Collections.emptySet();
1✔
370
    }
371

372
    // Get all cluster names to which requests can be routed through the virtual host.
373
    Set<String> clusters = new HashSet<>();
1✔
374
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
375
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
376
      if (action == null) {
1✔
377
        continue;
1✔
378
      }
379
      if (action.cluster() != null) {
1✔
380
        clusters.add(action.cluster());
1✔
381
      } else if (action.weightedClusters() != null) {
1✔
382
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
383
          clusters.add(weighedCluster.name());
1✔
384
        }
1✔
385
      }
386
    }
1✔
387

388
    return clusters;
1✔
389
  }
390

391
  private static class TypeWatchers<T extends ResourceUpdate> {
392
    // Key is resource name
393
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
394
    final XdsResourceType<T> resourceType;
395

396
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
397
      this.resourceType = resourceType;
1✔
398
    }
1✔
399
  }
400

401
  public interface XdsConfigWatcher {
402
    /**
403
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
404
     * INTERNAL.
405
     */
406
    void onUpdate(StatusOr<XdsConfig> config);
407
  }
408

409
  private final class ClusterSubscription implements Closeable {
410
    private final String clusterName;
411
    boolean closed; // Accessed from syncContext
412

413
    public ClusterSubscription(String clusterName) {
1✔
414
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
415
    }
1✔
416

417
    String getClusterName() {
418
      return clusterName;
1✔
419
    }
420

421
    @Override
422
    public void close() throws IOException {
423
      releaseSubscription(this);
1✔
424
    }
1✔
425
  }
426

427
  /** State for tracing garbage collector. */
428
  private static final class WatcherTracer {
1✔
429
    private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
430
    private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
431

432
    public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
1✔
433
      this.resourceWatchers = resourceWatchers;
1✔
434

435
      this.usedWatchers = new HashMap<>();
1✔
436
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
437
        usedWatchers.put(type, newTypeWatchers(type));
1✔
438
      }
1✔
439
    }
1✔
440

441
    private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
442
        XdsResourceType<T> type) {
443
      return new TypeWatchers<T>(type);
1✔
444
    }
445

446
    public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
447
        XdsResourceType<T> resourceType, String name) {
448
      TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
449
      if (typeWatchers == null) {
1✔
450
        return null;
×
451
      }
452
      assert typeWatchers.resourceType == resourceType;
1✔
453
      @SuppressWarnings("unchecked")
454
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
455
      XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
1✔
456
      if (watcher == null) {
1✔
457
        return null;
×
458
      }
459
      @SuppressWarnings("unchecked")
460
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
1✔
461
      usedTypeWatchers.watchers.put(name, watcher);
1✔
462
      return watcher;
1✔
463
    }
464

465
    /** Shut down unused watchers. */
466
    public void closeUnusedWatchers() {
467
      boolean changed = false; // Help out the GC by preferring old objects
1✔
468
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
469
        TypeWatchers<?> orig = resourceWatchers.get(type);
1✔
470
        TypeWatchers<?> used = usedWatchers.get(type);
1✔
471
        for (String name : orig.watchers.keySet()) {
1✔
472
          if (used.watchers.containsKey(name)) {
1✔
473
            continue;
1✔
474
          }
475
          orig.watchers.get(name).close();
1✔
476
          changed = true;
1✔
477
        }
1✔
478
      }
1✔
479
      if (changed) {
1✔
480
        resourceWatchers.putAll(usedWatchers);
1✔
481
      }
482
    }
1✔
483
  }
484

485
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
486
      implements ResourceWatcher<T> {
487
    private final XdsResourceType<T> type;
488
    private final String resourceName;
489
    boolean cancelled;
490

491
    @Nullable
492
    private StatusOr<T> data;
493

494

495
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
496
      this.type = checkNotNull(type, "type");
1✔
497
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
498
    }
1✔
499

500
    @Override
501
    public void onError(Status error) {
502
      checkNotNull(error, "error");
1✔
503
      if (cancelled) {
1✔
504
        return;
×
505
      }
506
      // Don't update configuration on error, if we've already received configuration
507
      if (!hasDataValue()) {
1✔
508
        setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
509
            String.format("Error retrieving %s: %s: %s",
1✔
510
              toContextString(), error.getCode(), error.getDescription())));
1✔
511
        maybePublishConfig();
1✔
512
      }
513
    }
1✔
514

515
    @Override
516
    public void onResourceDoesNotExist(String resourceName) {
517
      if (cancelled) {
1✔
518
        return;
×
519
      }
520

521
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
522
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
523
          toContextString() + " does not exist" + nodeInfo()));
1✔
524
      maybePublishConfig();
1✔
525
    }
1✔
526

527
    public void close() {
528
      cancelled = true;
1✔
529
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
530
    }
1✔
531

532
    boolean missingResult() {
533
      return data == null;
1✔
534
    }
535

536
    @Nullable
537
    StatusOr<T> getData() {
538
      return data;
1✔
539
    }
540

541
    boolean hasDataValue() {
542
      return data != null && data.hasValue();
1✔
543
    }
544

545
    String resourceName() {
546
      return resourceName;
×
547
    }
548

549
    protected void setData(T data) {
550
      checkNotNull(data, "data");
1✔
551
      this.data = StatusOr.fromValue(data);
1✔
552
    }
1✔
553

554
    protected void setDataAsStatus(Status status) {
555
      checkNotNull(status, "status");
1✔
556
      this.data = StatusOr.fromStatus(status);
1✔
557
    }
1✔
558

559
    public String toContextString() {
560
      return toContextStr(type.typeName(), resourceName);
1✔
561
    }
562
  }
563

564
  private interface RdsUpdateSupplier {
565
    StatusOr<RdsUpdate> getRdsUpdate();
566
  }
567

568
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
569
      implements RdsUpdateSupplier {
570

571
    private LdsWatcher(String resourceName) {
1✔
572
      super(XdsListenerResource.getInstance(), resourceName);
1✔
573
    }
1✔
574

575
    @Override
576
    public void onChanged(XdsListenerResource.LdsUpdate update) {
577
      checkNotNull(update, "update");
1✔
578
      if (cancelled) {
1✔
579
        return;
1✔
580
      }
581

582
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
583
      List<VirtualHost> virtualHosts;
584
      if (httpConnectionManager == null) {
1✔
585
        // TCP listener. Unsupported config
586
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
587
      } else {
588
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
589
      }
590
      if (virtualHosts != null) {
1✔
591
        updateRoutes(virtualHosts);
1✔
592
      }
593

594
      String rdsName = getRdsName(update);
1✔
595
      if (rdsName != null) {
1✔
596
        addRdsWatcher(rdsName);
1✔
597
      }
598

599
      setData(update);
1✔
600
      maybePublishConfig();
1✔
601
    }
1✔
602

603
    private String getRdsName(XdsListenerResource.LdsUpdate update) {
604
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
605
      if (httpConnectionManager == null) {
1✔
606
        // TCP listener. Unsupported config
607
        return null;
1✔
608
      }
609
      return httpConnectionManager.rdsName();
1✔
610
    }
611

612
    private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTracer tracer) {
613
      String rdsName = getRdsName(update);
1✔
614
      if (rdsName == null) {
1✔
615
        return null;
×
616
      }
617
      return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
1✔
618
    }
619

620
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
621
      if (!hasDataValue()) {
1✔
622
        return this;
×
623
      }
624
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
625
      if (hcm == null) {
1✔
626
        return this;
1✔
627
      }
628
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
629
      if (virtualHosts != null) {
1✔
630
        return this;
1✔
631
      }
632
      RdsWatcher rdsWatcher = getRdsWatcher(getData().getValue(), tracer);
1✔
633
      assert rdsWatcher != null;
1✔
634
      return rdsWatcher;
1✔
635
    }
636

637
    @Override
638
    public StatusOr<RdsUpdate> getRdsUpdate() {
639
      if (missingResult()) {
1✔
640
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
641
      }
642
      if (!getData().hasValue()) {
1✔
643
        return StatusOr.fromStatus(getData().getStatus());
×
644
      }
645
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
646
      if (hcm == null) {
1✔
647
        return StatusOr.fromStatus(
1✔
648
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
649
      }
650
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
651
      if (virtualHosts == null) {
1✔
652
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
653
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
654
        // bug
655
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
656
      }
657
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
658
    }
659
  }
660

661
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
662

663
    public RdsWatcher(String resourceName) {
1✔
664
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
665
    }
1✔
666

667
    @Override
668
    public void onChanged(RdsUpdate update) {
669
      checkNotNull(update, "update");
1✔
670
      if (cancelled) {
1✔
671
        return;
1✔
672
      }
673
      setData(update);
1✔
674
      updateRoutes(update.virtualHosts);
1✔
675
      maybePublishConfig();
1✔
676
    }
1✔
677

678
    @Override
679
    public StatusOr<RdsUpdate> getRdsUpdate() {
680
      if (missingResult()) {
1✔
681
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
682
      }
683
      return getData();
1✔
684
    }
685
  }
686

687
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
688
    CdsWatcher(String resourceName) {
1✔
689
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
690
    }
1✔
691

692
    @Override
693
    public void onChanged(XdsClusterResource.CdsUpdate update) {
694
      checkNotNull(update, "update");
1✔
695
      if (cancelled) {
1✔
696
        return;
1✔
697
      }
698
      switch (update.clusterType()) {
1✔
699
        case EDS:
700
          setData(update);
1✔
701
          addEdsWatcher(getEdsServiceName());
1✔
702
          break;
1✔
703
        case LOGICAL_DNS:
704
          setData(update);
1✔
705
          // no eds needed
706
          break;
1✔
707
        case AGGREGATE:
708
          setData(update);
1✔
709
          update.prioritizedClusterNames()
1✔
710
              .forEach(name -> addClusterWatcher(name));
1✔
711
          break;
1✔
712
        default:
713
          Status error = Status.UNAVAILABLE.withDescription(
×
714
              "unknown cluster type in " + resourceName() + " " + update.clusterType());
×
715
          setDataAsStatus(error);
×
716
      }
717
      maybePublishConfig();
1✔
718
    }
1✔
719

720
    public String getEdsServiceName() {
721
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
722
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
723
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
724
      if (edsServiceName == null) {
1✔
725
        edsServiceName = cdsUpdate.clusterName();
×
726
      }
727
      return edsServiceName;
1✔
728
    }
729
  }
730

731
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
732
    private EdsWatcher(String resourceName) {
1✔
733
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
734
    }
1✔
735

736
    @Override
737
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
738
      if (cancelled) {
1✔
739
        return;
1✔
740
      }
741
      setData(checkNotNull(update, "update"));
1✔
742
      maybePublishConfig();
1✔
743
    }
1✔
744
  }
745
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc